From ac27d62f3316e11729fe4e06add50dd053206afb Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 11 May 2022 02:29:53 +0000 Subject: [PATCH] refact data code --- include/common/tdataformat.h | 73 ++++++++++++++++----- include/common/tmsg.h | 7 +- source/common/src/tdataformat.c | 32 +++++++-- source/dnode/vnode/src/tq/tq.c | 6 +- source/libs/parser/src/parTranslater.c | 4 +- source/libs/parser/test/parInitialCTest.cpp | 2 +- 6 files changed, 92 insertions(+), 32 deletions(-) diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index 1f3b787538..5a977b22e7 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -18,6 +18,7 @@ #include "os.h" #include "talgo.h" +#include "tencode.h" #include "ttypes.h" #include "tutil.h" @@ -25,6 +26,59 @@ extern "C" { #endif +typedef struct SSchema SSchema; +typedef struct STColumn STColumn; +typedef struct STSchema STSchema; +typedef struct STSRow2 STSRow2; +typedef struct STSRowBuilder STSRowBuilder; + +#define TD_KV_ROW 0x1U + +// STSchema + +// STSRow2 +int32_t tEncodeTSRow(SEncoder *pEncoder, const STSRow2 *pRow); +int32_t tDecodeTSRow(SDecoder *pDecoder, STSRow2 *pRow); + +// STSchema +int32_t tTSchemaCreate(STSchema **ppTSchema); +int32_t tTSchemaDestroy(STSchema *pTSchema); + +// STRUCT ================= +struct STColumn { + col_id_t colId; + int8_t type; + int8_t flags; + int32_t bytes; + int32_t offset; +}; + +struct STSchema { + int32_t numOfCols; + schema_ver_t version; + uint16_t flen; + int32_t vlen; + int32_t tlen; + STColumn columns[]; +}; + +struct STSRow2 { + TSKEY ts; + uint32_t flags; + union { + int32_t sver; + int32_t ncols; + }; + uint32_t nData; + const uint8_t *pData; +}; + +struct STSRowBuilder { + STSchema *pTSchema; + STSRow2 row; +}; + +#if 1 //==================================== // Imported since 3.0 and use bitmap to demonstrate None/Null/Norm, while use Null/Norm below 3.0 without of bitmap. #define TD_SUPPORT_BITMAP #define TD_SUPPORT_READ2 @@ -59,15 +113,6 @@ extern "C" { } while (0); // ----------------- TSDB COLUMN DEFINITION -#pragma pack(push, 1) -typedef struct { - col_id_t colId; // column ID(start from PRIMARYKEY_TIMESTAMP_COL_ID(1)) - int8_t type; // column type - int8_t flags; // flags: 0 no index, 1 SCHEMA_SMA_ON, 2 SCHEMA_IDX_ON - int32_t bytes; // column bytes (0~16M) - int32_t offset; // point offset in STpRow after the header part. -} STColumn; -#pragma pack(pop) #define colType(col) ((col)->type) #define colFlags(col) ((col)->flags) @@ -82,15 +127,6 @@ typedef struct { #define colSetOffset(col, o) (colOffset(col) = (o)) // ----------------- TSDB SCHEMA DEFINITION -typedef struct { - int32_t numOfCols; // Number of columns appended - schema_ver_t version; // schema version - uint16_t flen; // First part length in a STpRow after the header part - int32_t vlen; // pure value part length, excluded the overhead (bytes only) - int32_t tlen; // maximum length of a STpRow without the header part - // (sizeof(VarDataOffsetT) + sizeof(VarDataLenT) + (bytes)) - STColumn columns[]; -} STSchema; #define schemaNCols(s) ((s)->numOfCols) #define schemaVersion(s) ((s)->version) @@ -386,6 +422,7 @@ static FORCE_INLINE int32_t tdAddColToKVRow(SKVRowBuilder *pBuilder, col_id_t co return 0; } +#endif #ifdef __cplusplus } diff --git a/include/common/tmsg.h b/include/common/tmsg.h index ff2e419c75..26bfa997f7 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -267,8 +267,9 @@ typedef struct { SSubmitRspBlock failedBlocks[]; } SSubmitRsp; -#define SCHEMA_SMA_ON 0x1 -#define SCHEMA_IDX_ON 0x2 +#define COL_SMA_ON ((int8_t)0x1) +#define COL_IDX_ON ((int8_t)0x2) +#define COL_VAL_SET ((int8_t)0x4) typedef struct SSchema { int8_t type; int8_t flags; @@ -277,7 +278,7 @@ typedef struct SSchema { char name[TSDB_COL_NAME_LEN]; } SSchema; -#define IS_BSMA_ON(s) (((s)->flags & 0x01) == SCHEMA_SMA_ON) +#define IS_BSMA_ON(s) (((s)->flags & 0x01) == COL_SMA_ON) #define SSCHMEA_TYPE(s) ((s)->type) #define SSCHMEA_FLAGS(s) ((s)->flags) diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 5d893fe398..558190a8ab 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -19,12 +19,33 @@ #include "tdatablock.h" #include "tlog.h" +int32_t tEncodeTSRow(SEncoder *pEncoder, const STSRow2 *pRow) { + if (tEncodeI64(pEncoder, pRow->ts) < 0) return -1; + if (tEncodeU32v(pEncoder, pRow->flags) < 0) return -1; + if (pRow->flags & TD_KV_ROW) { + if (tEncodeI32v(pEncoder, pRow->ncols) < 0) return -1; + } else { + if (tEncodeI32v(pEncoder, pRow->sver) < 0) return -1; + } + if (tEncodeBinary(pEncoder, pRow->pData, pRow->nData) < 0) return -1; + return 0; +} + +int32_t tDecodeTSRow(SDecoder *pDecoder, STSRow2 *pRow) { + if (tDecodeI64(pDecoder, &pRow->ts) < 0) return -1; + if (tDecodeU32v(pDecoder, &pRow->flags) < 0) return -1; + if (pRow->flags & TD_KV_ROW) { + if (tDecodeI32v(pDecoder, &pRow->ncols) < 0) return -1; + } else { + if (tDecodeI32v(pDecoder, &pRow->sver) < 0) return -1; + } + if (tDecodeBinary(pDecoder, &pRow->pData, &pRow->nData) < 0) return -1; + return 0; +} + +#if 1 // ==================== static void dataColSetNEleNull(SDataCol *pCol, int nEle); -#if 0 -static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, - int limit2, int tRows, bool forceSetNull); -#endif -int tdAllocMemForCol(SDataCol *pCol, int maxPoints) { +int tdAllocMemForCol(SDataCol *pCol, int maxPoints) { int spaceNeeded = pCol->bytes * maxPoints; if (IS_VAR_DATA_TYPE(pCol->type)) { spaceNeeded += sizeof(VarDataOffsetT) * maxPoints; @@ -504,3 +525,4 @@ SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) { return row; } +#endif \ No newline at end of file diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 6ca60945cd..ef32216810 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -66,9 +66,9 @@ static void tdSRowDemo() { SRowBuilder rb = {0}; SSchema schema[DEMO_N_COLS] = { - {.type = TSDB_DATA_TYPE_TIMESTAMP, .colId = 1, .name = "ts", .bytes = 8, .flags = SCHEMA_SMA_ON}, - {.type = TSDB_DATA_TYPE_INT, .colId = 2, .name = "c1", .bytes = 4, .flags = SCHEMA_SMA_ON}, - {.type = TSDB_DATA_TYPE_INT, .colId = 3, .name = "c2", .bytes = 4, .flags = SCHEMA_SMA_ON}}; + {.type = TSDB_DATA_TYPE_TIMESTAMP, .colId = 1, .name = "ts", .bytes = 8, .flags = COL_SMA_ON}, + {.type = TSDB_DATA_TYPE_INT, .colId = 2, .name = "c1", .bytes = 4, .flags = COL_SMA_ON}, + {.type = TSDB_DATA_TYPE_INT, .colId = 3, .name = "c2", .bytes = 4, .flags = COL_SMA_ON}}; SSchema* pSchema = schema; STSchema* pTSChema = tdGetSTSChemaFromSSChema(&pSchema, numOfCols); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index cdcb2592a7..5b984d766c 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -2138,7 +2138,7 @@ static int32_t columnDefNodeToField(SNodeList* pList, SArray** pArray) { SField field = {.type = pCol->dataType.type, .bytes = calcTypeBytes(pCol->dataType)}; strcpy(field.name, pCol->colName); if (pCol->sma) { - field.flags |= SCHEMA_SMA_ON; + field.flags |= COL_SMA_ON; } taosArrayPush(*pArray, &field); } @@ -2321,7 +2321,7 @@ static int32_t checkCreateTable(STranslateContext* pCxt, SCreateTableStmt* pStmt static void toSchema(const SColumnDefNode* pCol, col_id_t colId, SSchema* pSchema) { int8_t flags = 0; if (pCol->sma) { - flags |= SCHEMA_SMA_ON; + flags |= COL_SMA_ON; } pSchema->colId = colId; pSchema->type = pCol->dataType.type; diff --git a/source/libs/parser/test/parInitialCTest.cpp b/source/libs/parser/test/parInitialCTest.cpp index cf364aba5c..80921f59a9 100644 --- a/source/libs/parser/test/parInitialCTest.cpp +++ b/source/libs/parser/test/parInitialCTest.cpp @@ -117,7 +117,7 @@ TEST_F(ParserInitialCTest, createStable) { }; auto addFieldToCreateStbReqFunc = [&](bool col, const char* pFieldName, uint8_t type, int32_t bytes = 0, - int8_t flags = SCHEMA_SMA_ON) { + int8_t flags = COL_SMA_ON) { SField field = {0}; strcpy(field.name, pFieldName); field.type = type; -- GitLab