From d71fece2aa6a135e289636dcd5a37dce622af963 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Wed, 9 Feb 2022 20:34:11 +0800 Subject: [PATCH] STSRow refactor --- include/common/tdataformat.h | 43 +- include/common/tmsg.h | 6 +- include/common/trow.h | 289 ++++++--- source/client/test/clientTests.cpp | 10 +- source/common/src/tdataformat.c | 38 +- source/common/src/tmsg.c | 10 +- source/common/src/trow.c | 651 +++------------------ source/dnode/vnode/src/inc/tsdbMemTable.h | 8 +- source/dnode/vnode/src/inc/tsdbReadImpl.h | 4 +- source/dnode/vnode/src/tq/tq.c | 16 +- source/dnode/vnode/src/tsdb/tsdbCommit.c | 60 +- source/dnode/vnode/src/tsdb/tsdbMain.c | 4 +- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 110 ++-- source/dnode/vnode/src/tsdb/tsdbRead.c | 107 ++-- source/dnode/vnode/src/tsdb/tsdbReadImpl.c | 69 ++- source/libs/parser/inc/dataBlockMgt.h | 35 +- source/libs/parser/src/dataBlockMgt.c | 115 +--- source/libs/parser/src/insertParser.c | 49 +- source/libs/parser/src/parserUtil.c | 8 +- 19 files changed, 645 insertions(+), 987 deletions(-) diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index 7558b8b41f..4a73cf7dd2 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -26,6 +26,7 @@ extern "C" { // Imported since 3.0 and use bitmap to demonstrate None/Null/Norm, while use Null/Norm below 3.0 without of bitmap. #define TD_SUPPORT_BITMAP +#define TD_SUPPORT_READ2 #define TD_SUPPORT_BACK2 // suppport back compatibility of 2.0 #define TASSERT(x) ASSERT(x) @@ -61,7 +62,7 @@ typedef struct { int8_t type; // Column type col_id_t colId; // column ID(start from PRIMARYKEY_TIMESTAMP_COL_ID(1)) int16_t bytes; // column bytes (restore to int16_t in case of misuse) - uint16_t offset; // point offset in SDataRow after the header part. + uint16_t offset; // point offset in STpRow after the header part. } STColumn; #define colType(col) ((col)->type) @@ -78,9 +79,9 @@ typedef struct { typedef struct { int version; // version int numOfCols; // Number of columns appended - int tlen; // maximum length of a SDataRow without the header part (sizeof(VarDataOffsetT) + sizeof(VarDataLenT) + - // (bytes)) - uint16_t flen; // First part length in a SDataRow after the header part + int tlen; // maximum length of a STpRow without the header part (sizeof(VarDataOffsetT) + sizeof(VarDataLenT) + + // (bytes)) + uint16_t flen; // First part length in a STpRow after the header part uint16_t vlen; // pure value part length, excluded the overhead (bytes only) STColumn columns[]; } STSchema; @@ -161,19 +162,19 @@ typedef uint64_t TKEY; #else -typedef uint64_t TKEY; +// typedef uint64_t TKEY; +#define TKEY TSKEY #define TKEY_INVALID UINT64_MAX #define TKEY_NULL TKEY_INVALID #define TKEY_NEGATIVE_FLAG (((TKEY)1) << 63) -#define TKEY_DELETE_FLAG (((TKEY)1) << 62) -#define TKEY_VALUE_FILTER (~(TKEY_NEGATIVE_FLAG | TKEY_DELETE_FLAG)) +#define TKEY_VALUE_FILTER (~(TKEY_NEGATIVE_FLAG)) #define TKEY_IS_NEGATIVE(tkey) (((tkey)&TKEY_NEGATIVE_FLAG) != 0) -#define TKEY_IS_DELETED(tkey) (((tkey)&TKEY_DELETE_FLAG) != 0) -#define tdSetTKEYDeleted(tkey) ((tkey) | TKEY_DELETE_FLAG) -#define tdGetTKEY(key) (((TKEY)TABS(key)) | (TKEY_NEGATIVE_FLAG & (TKEY)(key))) -#define tdGetKey(tkey) (((TSKEY)((tkey)&TKEY_VALUE_FILTER)) * (TKEY_IS_NEGATIVE(tkey) ? -1 : 1)) +#define TKEY_IS_DELETED(tkey) (false) + +#define tdGetTKEY(key) (key) +#define tdGetKey(tskey) (tskey) #define MIN_TS_KEY ((TSKEY)0x8000000000000001) #define MAX_TS_KEY ((TSKEY)0x7fffffffffffffff) @@ -206,6 +207,7 @@ static FORCE_INLINE int tkeyComparFn(const void *tkey1, const void *tkey2) { } } +#if 0 // ----------------- Data row structure /* A data row, the format is like below: @@ -355,10 +357,12 @@ static FORCE_INLINE void tdCopyColOfRowBySchema(SDataRow dst, STSchema *pDstSche memcpy(pData, value, pSrcSchema->columns[srcIdx].bytes); } } - +#endif // ----------------- Data column structure typedef struct SDataCol { - int8_t type; // column type + int8_t type; // column type + uint8_t bitmap : 1; // 0: has bitmap if has NULL/NORM rows, 1: no bitmap if all rows are NORM + uint8_t reserve : 7; int16_t colId; // column ID int bytes; // column data bytes defined int offset; // data offset in a SDataRow (including the header size) @@ -366,7 +370,7 @@ typedef struct SDataCol { int len; // column data length VarDataOffsetT *dataOff; // For binary and nchar data, the offset in the data column void * pData; // Actual data pointer - void * pBitmap; // Bitmap pointer to mark Null/Norm(1 bit for each row) + void * pBitmap; // Bitmap pointer TSKEY ts; // only used in last NULL column } SDataCol; @@ -378,10 +382,11 @@ int tdAllocMemForCol(SDataCol *pCol, int maxPoints); void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints); int dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints); -void dataColSetOffset(SDataCol *pCol, int nEle); +void *dataColSetOffset(SDataCol *pCol, int nEle); bool isNEleNull(SDataCol *pCol, int nEle); +#if 0 // Get the data pointer from a column-wised data static FORCE_INLINE const void *tdGetColDataOfRow(SDataCol *pCol, int row) { if (isAllRowsNull(pCol)) { @@ -403,7 +408,7 @@ static FORCE_INLINE int32_t dataColGetNEleLen(SDataCol *pDataCol, int rows) { return TYPE_BYTES[pDataCol->type] * rows; } } - +#endif typedef struct { col_id_t maxCols; // max number of columns col_id_t numOfCols; // Total number of cols @@ -521,6 +526,7 @@ static FORCE_INLINE void *tdGetKVRowIdxOfCol(SKVRow row, int16_t colId) { return taosbsearch(&colId, kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), comparTagId, TD_EQ); } +#if 0 // offset here not include kvRow header length static FORCE_INLINE int tdAppendKvColVal(SKVRow row, const void *value, bool isCopyValData, int16_t colId, int8_t type, int32_t offset) { @@ -567,7 +573,7 @@ static FORCE_INLINE void *tdGetKVRowValOfColEx(SKVRow row, int16_t colId, int32_ } return NULL; } - +#endif // ----------------- K-V data row builder typedef struct { int16_t tCols; @@ -611,7 +617,7 @@ static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, return 0; } - +#if 0 // ----------------- SMemRow appended with tuple row structure /* * |---------|------------------------------------------------- len ---------------------------------->| @@ -770,6 +776,7 @@ static FORCE_INLINE void setSColInfo(SColInfo *colInfo, int16_t colId, uint8_t c } SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSchema1, STSchema *pSchema2); +#endif #ifdef __cplusplus } diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 70e76517c6..398478b4e0 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -25,7 +25,7 @@ extern "C" { #include "taoserror.h" #include "tarray.h" #include "tcoding.h" -#include "tdataformat.h" +#include "trow.h" #include "thash.h" #include "tlist.h" @@ -212,7 +212,7 @@ typedef struct { typedef struct { int32_t totalLen; int32_t len; - SMemRow row; + STSRow *row; } SSubmitBlkIter; typedef struct { @@ -224,7 +224,7 @@ typedef struct { int tInitSubmitMsgIter(SSubmitMsg* pMsg, SSubmitMsgIter* pIter); int tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock); int tInitSubmitBlkIter(SSubmitBlk* pBlock, SSubmitBlkIter* pIter); -SMemRow tGetSubmitBlkNext(SSubmitBlkIter* pIter); +STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter); typedef struct { int32_t index; // index of failed block in submit blocks diff --git a/include/common/trow.h b/include/common/trow.h index ec8b38df46..323c430a9d 100644 --- a/include/common/trow.h +++ b/include/common/trow.h @@ -20,45 +20,46 @@ #include "talgo.h" #include "taoserror.h" #include "tbuffer.h" -#include "tdataformat.h" #include "tdef.h" +#include "taosdef.h" +#include "tdataformat.h" #include "tschema.h" #include "ttypes.h" +#include "tutil.h" #ifdef __cplusplus extern "C" { #endif -// Target of trow.h: -// 1. Row related definition in dataformat.h of 2.0 could be replaced with trow.h of 3.0. -// 2. The basic definition in dataformat.h is shared with trow.h of 3.0. +// Target of tdataformat.h: +// 1. Row related definition in dataformat.h of 2.0 could be replaced with tdataformat.h of 3.0. +// 2. The basic definition in dataformat.h is shared with tdataformat.h of 3.0. // row type -#define TD_ROW_TP 0x0 // default -#define TD_ROW_KV 0x01 +#define TD_ROW_TP 0x0U // default +#define TD_ROW_KV 0x01U /** * @brief val type * - for data from client input and STSRow in memory, 3 types of val none/null/norm available - * - for data in + * - for data in */ -#define TD_VTYPE_NORM 0x0U // normal val: not none, not null -#define TD_VTYPE_NONE 0x01U // none or unknown/undefined -#define TD_VTYPE_NULL 0x02U // null val +#define TD_VTYPE_NONE 0x0U // none or unknown/undefined +#define TD_VTYPE_NULL 0x01U // null val +#define TD_VTYPE_NORM 0x02U // normal val: not none, not null +#define TD_VTYPE_MAX 0x03U // #define TD_VTYPE_NORM_BYTE 0x0U #define TD_VTYPE_NONE_BYTE 0x55U #define TD_VTYPE_NULL_BYTE 0xAAU - - #define KvConvertRatio (0.9f) #define isSelectKVRow(klen, tlen) ((klen) < ((tlen)*KvConvertRatio)) #ifdef TD_SUPPORT_BITMAP -static FORCE_INLINE bool tdValTypeIsNorm(TDRowValT valType) { return (valType & TD_VTYPE_NORM); } -static FORCE_INLINE bool tdValTypeIsNone(TDRowValT valType) { return (valType & TD_VTYPE_NONE); } -static FORCE_INLINE bool tdValTypeIsNull(TDRowValT valType) { return (valType & TD_VTYPE_NULL); } +static FORCE_INLINE bool tdValTypeIsNone(TDRowValT valType) { return (valType & 0x03U) == TD_VTYPE_NONE; } +static FORCE_INLINE bool tdValTypeIsNull(TDRowValT valType) { return (valType & 0x03U) == TD_VTYPE_NULL; } +static FORCE_INLINE bool tdValTypeIsNorm(TDRowValT valType) { return (valType & 0x03U) == TD_VTYPE_NORM; } #endif static FORCE_INLINE bool tdValIsNorm(TDRowValT valType, const void *val, int32_t colType) { @@ -137,6 +138,24 @@ typedef struct { char data[]; } STSRow; +typedef struct { + // basic info + int8_t rowType; + int16_t sver; + STSRow *pBuf; + + // extended info + int32_t flen; + int16_t nBoundCols; + int16_t nCols; + int16_t nBitmaps; + int16_t nBoundBitmaps; + int32_t offset; + void * pBitmap; + void * pOffset; + int32_t extendedRowSize; +} SRowBuilder; + #define TD_ROW_HEAD_LEN (sizeof(STSRow)) #define TD_ROW_TYPE(r) ((r)->type) @@ -146,7 +165,8 @@ typedef struct { #define TD_ROW_NCOLS(r) ((r)->ncols) #define TD_ROW_DATA(r) ((r)->data) #define TD_ROW_LEN(r) ((r)->len) -#define TD_ROW_TSKEY(r) ((r)->ts) +#define TD_ROW_KEY(r) ((r)->ts) +#define TD_ROW_KEY_ADDR(r) POINTER_SHIFT((r), 16) // N.B. If without STSchema, getExtendedRowSize() is used to get the rowMaxBytes and // (int)ceil((double)nCols/TD_VTYPE_PARTS) should be added if TD_SUPPORT_BITMAP defined. @@ -157,14 +177,22 @@ typedef struct { #define TD_ROW_SET_SVER(r, v) (TD_ROW_SVER(r) = (v)) #define TD_ROW_SET_LEN(r, l) (TD_ROW_LEN(r) = (l)) -#define TD_ROW_IS_DELETED(r) (TD_ROW_DELETE(r)) +#define TD_ROW_IS_DELETED(r) (TD_ROW_DELETE(r) == 1) #define TD_IS_TP_ROW(r) (TD_ROW_TYPE(r) == TD_ROW_TP) #define TD_IS_KV_ROW(r) (TD_ROW_TYPE(r) == TD_ROW_KV) +#define TD_IS_TP_ROW_T(t) ((t) == TD_ROW_TP) +#define TD_IS_KV_ROW_T(t) ((t) == TD_ROW_KV) + #define TD_BOOL_STR(b) ((b) ? "true" : "false") #define isUtilizeKVRow(k, d) ((k) < ((d)*KVRatioConvert)) #define TD_KV_ROW_COL_IDX(r) TD_ROW_DATA(r) +static FORCE_INLINE void tdRowCpy(void *dst, const STSRow *pRow) { memcpy(dst, pRow, TD_ROW_LEN(pRow)); } +static FORCE_INLINE const char *tdRowEnd(STSRow *pRow) { return (const char *)POINTER_SHIFT(pRow, TD_ROW_LEN(pRow)); } + +STSRow *tdRowDup(STSRow *row); + static FORCE_INLINE SKvRowIdx *tdKvRowColIdxAt(STSRow *pRow, uint16_t idx) { return (SKvRowIdx *)TD_KV_ROW_COL_IDX(pRow) + idx; } @@ -174,30 +202,32 @@ static FORCE_INLINE void *tdKVRowColVal(STSRow *pRow, SKvRowIdx *pIdx) { return static FORCE_INLINE void tdRowCopy(void *dst, STSRow *row) { memcpy(dst, row, TD_ROW_LEN(row)); } static FORCE_INLINE int32_t tdSetBitmapValType(void *pBitmap, int16_t colIdx, TDRowValT valType); -static FORCE_INLINE int32_t tdSetBitmapValTypeN(void *pBitmap, int16_t nEle, TDRowValT valType); +int32_t tdSetBitmapValTypeN(void *pBitmap, int16_t nEle, TDRowValT valType); static FORCE_INLINE int32_t tdGetBitmapValType(void *pBitmap, int16_t colIdx, TDRowValT *pValType); -static FORCE_INLINE int32_t tdAppendColValToTpRow(STSRow *row, void *pBitmap, TDRowValT valType, const void *val, +int tdAppendValToDataCol(SDataCol *pCol, TDRowValT valType, const void *val, int numOfRows, int maxPoints); +static FORCE_INLINE int32_t tdAppendColValToTpRow(SRowBuilder *pBuilder, TDRowValT valType, const void *val, bool isCopyVarData, int8_t colType, int16_t colIdx, int32_t offset); -static FORCE_INLINE int32_t tdAppendColValToKvRow(STSRow *row, void *pBitmap, TDRowValT valType, const void *val, +static FORCE_INLINE int32_t tdAppendColValToKvRow(SRowBuilder *pBuilder, TDRowValT valType, const void *val, bool isCopyVarData, int8_t colType, int16_t colIdx, int32_t offset, col_id_t colId); +int32_t tdAppendSTSRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols, bool forceSetNull); /** - * @brief - * - * @param pRow + * @brief + * + * @param pRow * @param flen flen in STSchema - * @return FORCE_INLINE* + * @return FORCE_INLINE* */ static FORCE_INLINE void *tdGetBitmapAddrTp(STSRow *pRow, uint32_t flen) { // The primary TS key is stored separatedly. - return POINTER_SHIFT(pRow->data, flen - sizeof(TSDB_DATA_TYPE_TIMESTAMP)); - // return POINTER_SHIFT(pRow->ts, flen); + return POINTER_SHIFT(pRow->data, flen - sizeof(TSKEY)); + // return POINTER_SHIFT(pRow->ts, flen); } static FORCE_INLINE void *tdGetBitmapAddrKv(STSRow *pRow, col_id_t nKvCols) { // The primary TS key is stored separatedly and is Norm value, thus should minus 1 firstly - return POINTER_SHIFT(pRow, (--nKvCols) * sizeof(SKvRowIdx)); + return POINTER_SHIFT(pRow->data, (--nKvCols) * sizeof(SKvRowIdx)); } static FORCE_INLINE void *tdGetBitmapAddr(STSRow *pRow, uint8_t rowType, uint32_t flen, col_id_t nKvCols) { #ifdef TD_SUPPORT_BITMAP @@ -213,17 +243,13 @@ static FORCE_INLINE void *tdGetBitmapAddr(STSRow *pRow, uint8_t rowType, uint32_ return NULL; } -// void tdFreeTpRow(STpRow row); -// void tdInitTpRow(STpRow row, STSchema *pSchema); -// STpRow tdTpRowDup(STpRow row); - /** - * @brief - * - * @param pBitmap + * @brief + * + * @param pBitmap * @param colIdx The relative index of colId, may have minus value as parameter. - * @param valType - * @return FORCE_INLINE + * @param valType + * @return FORCE_INLINE */ static FORCE_INLINE int32_t tdSetBitmapValType(void *pBitmap, int16_t colIdx, TDRowValT valType) { if (!pBitmap || colIdx < 0) { @@ -315,22 +341,7 @@ static FORCE_INLINE int32_t tdGetBitmapValType(void *pBitmap, int16_t colIdx, TD * +---------------------------------+------------+------------------------+ * */ -typedef struct { - // basic info - int8_t rowType; - int16_t sver; - STSRow *pBuf; - // extended info - int32_t flen; - int16_t nBoundCols; - int16_t nCols; - int16_t nBitmaps; - int16_t nBoundBitmaps; - int32_t offset; - void * pBitmap; - void * pOffset; -} SRowBuilder; /** * @brief @@ -340,7 +351,7 @@ typedef struct { * @return FORCE_INLINE */ static FORCE_INLINE void tdSRowInit(SRowBuilder *pBuilder, int16_t sver) { - pBuilder->rowType = TD_ROW_TP; // default STpRow + pBuilder->rowType = TD_ROW_TP; // default STpRow pBuilder->sver = sver; } @@ -353,37 +364,74 @@ static FORCE_INLINE void tdSRowInit(SRowBuilder *pBuilder, int16_t sver) { */ static FORCE_INLINE void tdSRowSetRowType(SRowBuilder *pBuilder, int8_t rowType) { pBuilder->rowType = rowType; } +/** + * @brief + * + * @param pBuilder + * @param nCols + * @param nBoundCols use -1 if not available + * @param flen + * @return FORCE_INLINE + */ +static FORCE_INLINE int32_t tdSRowSetInfo(SRowBuilder *pBuilder, int32_t nCols, int32_t nBoundCols, int32_t flen) { + pBuilder->flen = flen; + pBuilder->nCols = nCols; + pBuilder->nBoundCols = nBoundCols; + if (pBuilder->flen <= 0 || pBuilder->nCols <= 0) { + TASSERT(0); + terrno = TSDB_CODE_INVALID_PARA; + return terrno; + } +#ifdef TD_SUPPORT_BITMAP + // the primary TS key is stored separatedly + pBuilder->nBitmaps = (int16_t)TD_BITMAP_BYTES(pBuilder->nCols - 1); + if (nBoundCols > 0) { + pBuilder->nBoundBitmaps = (int16_t)TD_BITMAP_BYTES(pBuilder->nBoundCols - 1); + } else { + pBuilder->nBoundBitmaps = 0; + } +#else + pBuilder->nBitmaps = 0; + pBuilder->nBoundBitmaps = 0; +#endif + return TSDB_CODE_SUCCESS; +} + /** * @brief To judge row type: STpRow/SKvRow * * @param pBuilder - * @param allNullLen use -1 if not available - * @param boundNullLen use -1 if not available * @param nCols * @param nBoundCols * @param flen + * @param allNullLen use -1 if not available + * @param boundNullLen use -1 if not available * @return FORCE_INLINE */ -static FORCE_INLINE int32_t tdSRowSetExtendedInfo(SRowBuilder *pBuilder, int32_t allNullLen, int32_t boundNullLen, - int32_t nCols, int32_t nBoundCols, int32_t flen) { +static FORCE_INLINE int32_t tdSRowSetExtendedInfo(SRowBuilder *pBuilder, int32_t nCols, int32_t nBoundCols, + int32_t flen, int32_t allNullLen, int32_t boundNullLen) { if ((boundNullLen > 0) && (allNullLen > 0) && isSelectKVRow(boundNullLen, allNullLen)) { pBuilder->rowType = TD_ROW_KV; } else { pBuilder->rowType = TD_ROW_TP; } - - pBuilder->nBoundCols = nBoundCols; - pBuilder->nCols = nCols; + pBuilder->flen = flen; - if (pBuilder->flen <= 0 || pBuilder->nCols <= 0 || pBuilder->nBoundCols <= 0) { + pBuilder->nCols = nCols; + pBuilder->nBoundCols = nBoundCols; + if (pBuilder->flen <= 0 || pBuilder->nCols <= 0) { TASSERT(0); terrno = TSDB_CODE_INVALID_PARA; return terrno; } #ifdef TD_SUPPORT_BITMAP // the primary TS key is stored separatedly - pBuilder->nBitmaps = (int16_t)TD_BITMAP_BYTES(pBuilder->nCols - 1); - pBuilder->nBoundBitmaps = (int16_t)TD_BITMAP_BYTES(pBuilder->nBoundCols - 1); + pBuilder->nBitmaps = (int16_t)TD_BITMAP_BYTES(pBuilder->nCols - 1); + if (nBoundCols > 0) { + pBuilder->nBoundBitmaps = (int16_t)TD_BITMAP_BYTES(pBuilder->nBoundCols - 1); + } else { + pBuilder->nBoundBitmaps = 0; + } #else pBuilder->nBitmaps = 0; pBuilder->nBoundBitmaps = 0; @@ -397,13 +445,16 @@ static FORCE_INLINE int32_t tdSRowSetExtendedInfo(SRowBuilder *pBuilder, int32_t * @param pBuilder * @param pBuf Output buffer of STSRow */ -static FORCE_INLINE int32_t tdSRowResetBuf(SRowBuilder *pBuilder, void *pBuf) { +static int32_t tdSRowResetBuf(SRowBuilder *pBuilder, void *pBuf) { pBuilder->pBuf = (STSRow *)pBuf; if (!pBuilder->pBuf) { TASSERT(0); terrno = TSDB_CODE_INVALID_PARA; return terrno; } + + TD_ROW_SET_TYPE(pBuilder->pBuf, pBuilder->rowType); + uint32_t len = 0; switch (pBuilder->rowType) { case TD_ROW_TP: @@ -411,7 +462,7 @@ static FORCE_INLINE int32_t tdSRowResetBuf(SRowBuilder *pBuilder, void *pBuf) { pBuilder->pBitmap = tdGetBitmapAddrTp(pBuilder->pBuf, pBuilder->flen); #endif // the primary TS key is stored separatedly - len = TD_ROW_HEAD_LEN + pBuilder->flen - sizeof(TSDB_DATA_TYPE_TIMESTAMP) + pBuilder->nBitmaps; + len = TD_ROW_HEAD_LEN + pBuilder->flen - sizeof(TSKEY) + pBuilder->nBitmaps; TD_ROW_SET_LEN(pBuilder->pBuf, len); TD_ROW_SET_SVER(pBuilder->pBuf, pBuilder->sver); break; @@ -445,7 +496,7 @@ static FORCE_INLINE int32_t tdSRowResetBuf(SRowBuilder *pBuilder, void *pBuf) { */ static FORCE_INLINE int32_t tdSRowInitEx(SRowBuilder *pBuilder, void *pBuf, uint32_t allNullLen, uint32_t boundNullLen, int32_t nCols, int32_t nBoundCols, int32_t flen) { - if(tdSRowSetExtendedInfo(pBuilder, allNullLen, boundNullLen, nCols, nBoundCols, flen) < 0){ + if (tdSRowSetExtendedInfo(pBuilder, allNullLen, boundNullLen, nCols, nBoundCols, flen) < 0) { return terrno; } return tdSRowResetBuf(pBuilder, pBuf); @@ -466,10 +517,9 @@ static FORCE_INLINE void tdSRowReset(SRowBuilder *pBuilder) { } // internal func -static FORCE_INLINE int32_t tdAppendColValToTpRow(STSRow *row, void *pBitmap, TDRowValT valType, const void *val, +static FORCE_INLINE int32_t tdAppendColValToTpRow(SRowBuilder *pBuilder, TDRowValT valType, const void *val, bool isCopyVarData, int8_t colType, int16_t colIdx, int32_t offset) { - if ((offset < sizeof(TSKEY)) || (colIdx < 1)) { - TASSERT(0); + if ((offset < (int32_t)sizeof(TSKEY)) || (colIdx < 1)) { terrno = TSDB_CODE_INVALID_PARA; return terrno; } @@ -477,16 +527,18 @@ static FORCE_INLINE int32_t tdAppendColValToTpRow(STSRow *row, void *pBitmap, TD --colIdx; #ifdef TD_SUPPORT_BITMAP - if (tdSetBitmapValType(pBitmap, colIdx, valType) != TSDB_CODE_SUCCESS) { + if (tdSetBitmapValType(pBuilder->pBitmap, colIdx, valType) != TSDB_CODE_SUCCESS) { return terrno; } #endif - // 1. No need to set flen part for Null/None, just use bitmap. When upsert for the same primary TS key, the bitmap should - // be updated simultaneously if Norm val overwrite Null/None cols. + STSRow *row = pBuilder->pBuf; + + // 1. No need to set flen part for Null/None, just use bitmap. When upsert for the same primary TS key, the bitmap + // should be updated simultaneously if Norm val overwrite Null/None cols. // 2. When consume STSRow in memory by taos client/tq, the output of Null/None cols should both be Null. if (tdValIsNorm(valType, val, colType)) { - //TODO: The layout of new data types imported since 3.0 like blob/medium blob is the same with binary/nchar. + // TODO: The layout of new data types imported since 3.0 like blob/medium blob is the same with binary/nchar. if (IS_VAR_DATA_TYPE(colType)) { // ts key stored in STSRow.ts *(VarDataOffsetT *)POINTER_SHIFT(row->data, offset) = TD_ROW_LEN(row); @@ -501,12 +553,12 @@ static FORCE_INLINE int32_t tdAppendColValToTpRow(STSRow *row, void *pBitmap, TD #ifdef TD_SUPPORT_BACK2 // NULL/None value else { - //TODO: Null value for new data types imported since 3.0 need to be defined. + // TODO: Null value for new data types imported since 3.0 need to be defined. const void *nullVal = getNullValue(colType); if (IS_VAR_DATA_TYPE(colType)) { // ts key stored in STSRow.ts *(VarDataOffsetT *)POINTER_SHIFT(row->data, offset) = TD_ROW_LEN(row); - + if (isCopyVarData) { memcpy(POINTER_SHIFT(row, TD_ROW_LEN(row)), nullVal, varDataTLen(nullVal)); } @@ -521,10 +573,10 @@ static FORCE_INLINE int32_t tdAppendColValToTpRow(STSRow *row, void *pBitmap, TD } // internal func -static FORCE_INLINE int32_t tdAppendColValToKvRow(STSRow *row, void *pBitmap, TDRowValT valType, const void *val, +static FORCE_INLINE int32_t tdAppendColValToKvRow(SRowBuilder *pBuilder, TDRowValT valType, const void *val, bool isCopyVarData, int8_t colType, int16_t colIdx, int32_t offset, col_id_t colId) { - if ((offset < sizeof(SKvRowIdx)) || (colIdx < 1)) { + if ((offset < (int32_t)sizeof(SKvRowIdx)) || (colIdx < 1)) { TASSERT(0); terrno = TSDB_CODE_INVALID_PARA; return terrno; @@ -532,18 +584,18 @@ static FORCE_INLINE int32_t tdAppendColValToKvRow(STSRow *row, void *pBitmap, T offset -= sizeof(SKvRowIdx); --colIdx; - #ifdef TD_SUPPORT_BITMAP - if (tdSetBitmapValType(pBitmap, colIdx, valType) != TSDB_CODE_SUCCESS) { + if (tdSetBitmapValType(pBuilder->pBitmap, colIdx, valType) != TSDB_CODE_SUCCESS) { return terrno; } #endif + STSRow *row = pBuilder->pBuf; // No need to store None/Null values. if (tdValIsNorm(valType, val, colType)) { // ts key stored in STSRow.ts SKvRowIdx *pColIdx = (SKvRowIdx *)POINTER_SHIFT(row->data, offset); - char * ptr = (char *)POINTER_SHIFT(row, TD_ROW_LEN(row)); + char * ptr = (char *)POINTER_SHIFT(row, TD_ROW_LEN(row)); pColIdx->colId = colId; pColIdx->offset = TD_ROW_LEN(row); // the offset include the TD_ROW_HEAD_LEN @@ -561,7 +613,7 @@ static FORCE_INLINE int32_t tdAppendColValToKvRow(STSRow *row, void *pBitmap, T // NULL/None value else { SKvRowIdx *pColIdx = (SKvRowIdx *)POINTER_SHIFT(row->data, offset); - char * ptr = (char *)POINTER_SHIFT(row, TD_ROW_LEN(row)); + char * ptr = (char *)POINTER_SHIFT(row, TD_ROW_LEN(row)); pColIdx->colId = colId; pColIdx->offset = TD_ROW_LEN(row); // the offset include the TD_ROW_HEAD_LEN const void *nullVal = getNullValue(colType); @@ -587,16 +639,16 @@ static FORCE_INLINE int32_t tdAppendColValToKvRow(STSRow *row, void *pBitmap, T * @param pBuilder * @param colId start from PRIMARYKEY_TIMESTAMP_COL_ID * @param colType - * @param val * @param valType + * @param val + * @param isCopyVarData * @param offset * @param colIdx sorted column index, start from 0 * @return FORCE_INLINE */ -static FORCE_INLINE int32_t tdAppendColValToRow(SRowBuilder *pBuilder, int16_t colId, int8_t colType, const void *val, - TDRowValT valType, int32_t offset, int16_t colIdx) { +static FORCE_INLINE int32_t tdAppendColValToRow(SRowBuilder *pBuilder, int16_t colId, int8_t colType, TDRowValT valType, + const void *val, bool isCopyVarData, int32_t offset, int16_t colIdx) { STSRow *pRow = pBuilder->pBuf; - void * pBitmap = NULL; if (!val) { #ifdef TD_SUPPORT_BITMAP if (tdValTypeIsNorm(valType)) { @@ -611,7 +663,7 @@ static FORCE_INLINE int32_t tdAppendColValToRow(SRowBuilder *pBuilder, int16_t c } // TS KEY is stored in STSRow.ts and not included in STSRow.data field. if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) { - TD_ROW_TSKEY(pRow) = *(TSKEY *)val; + TD_ROW_KEY(pRow) = *(TSKEY *)val; // The primary TS key is Norm all the time, thus its valType is not stored in bitmap. // #ifdef TD_SUPPORT_BITMAP // pBitmap = tdGetBitmapAddr(pRow, pRow->type, pBuilder->flen, pRow->ncols); @@ -622,13 +674,10 @@ static FORCE_INLINE int32_t tdAppendColValToRow(SRowBuilder *pBuilder, int16_t c return TSDB_CODE_SUCCESS; } // TODO: We can avoid the type judegement by FP, but would prevent the inline scheme. - // typedef int (*tdAppendColValToSRowFp)(STSRow *pRow, void *pBitmap, int16_t colId, int8_t colType, - // const void *val, int8_t valType, int32_t tOffset, int16_t - // colIdx); if (TD_IS_TP_ROW(pRow)) { - tdAppendColValToTpRow(pRow, pBitmap, valType, val, true, colType, colIdx, offset); + tdAppendColValToTpRow(pBuilder, valType, val, true, colType, colIdx, offset); } else { - tdAppendColValToKvRow(pRow, pBitmap, valType, val, true, colType, colIdx, offset, colId); + tdAppendColValToKvRow(pBuilder, valType, val, true, colType, colIdx, offset, colId); } return TSDB_CODE_SUCCESS; } @@ -811,7 +860,6 @@ static FORCE_INLINE bool tdGetTpRowDataOfCol(STSRowIter *pIter, col_type_t colTy return true; } - // internal static FORCE_INLINE bool tdGetKvRowValOfColEx(STSRowIter *pIter, col_id_t colId, col_type_t colType, col_id_t *nIdx, SCellVal *pVal) { @@ -892,6 +940,65 @@ static FORCE_INLINE bool tdSTSRowIterNext(STSRowIter *pIter, col_id_t colId, col return true; } +STSRow *mergeTwoRows(void *buffer, STSRow *row1, STSRow *row2, STSchema *pSchema1, STSchema *pSchema2); + +// Get the data pointer from a column-wised data +static FORCE_INLINE int32_t tdGetColDataOfRow(SCellVal *pVal, SDataCol *pCol, int row) { + if (isAllRowsNone(pCol)) { + pVal->valType = TD_VTYPE_NULL; +#ifdef TD_SUPPORT_READ2 + pVal->val = (void*)getNullValue(pCol->type); +#else + pVal->val = NULL; +#endif + return TSDB_CODE_SUCCESS; + } + if (tdGetBitmapValType(pCol->pBitmap, row, &(pVal->valType)) < 0) { + return terrno; + } + if ((pCol->bitmap == 1) || tdValTypeIsNorm(pVal->valType)) { + if (IS_VAR_DATA_TYPE(pCol->type)) { + pVal->val = POINTER_SHIFT(pCol->pData, pCol->dataOff[row]); + } else { + pVal->val = POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * row); + } + } else { + pVal->valType = TD_VTYPE_NULL; +#ifdef TD_SUPPORT_READ2 + pVal->val = (void*)getNullValue(pCol->type); +#else + pVal->val = NULL; +#endif + } + return TSDB_CODE_SUCCESS; +} + +static FORCE_INLINE int32_t dataColGetNEleLen(SDataCol *pDataCol, int rows) { + ASSERT(rows > 0); + int32_t result = 0; + + if (IS_VAR_DATA_TYPE(pDataCol->type)) { + result += pDataCol->dataOff[rows - 1]; + SCellVal val = {0}; + if (tdGetColDataOfRow(&val, pDataCol, rows - 1) < 0) { + TASSERT(0); + } + + // Currently, count the varDataTLen in of Null/None cols considering back compatibility test for 2.4 + result += varDataTLen(val.val); + // TODO: later on, don't save Null/None for VarDataT for 3.0 + // if (tdValTypeIsNorm(val.valType)) { + // result += varDataTLen(val.val); + // } + } else { + result += TYPE_BYTES[pDataCol->type] * rows; + } + + ASSERT(pDataCol->len == result); + + return result; +} + #ifdef TROW_ORIGIN_HZ typedef struct { uint32_t nRows; diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 1e68faa4f4..5b31624c94 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -565,12 +565,18 @@ TEST(testCase, insert_test) { #endif -#if 0 +#if 1 TEST(testCase, projection_query_tables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); - TAOS_RES* pRes = taos_query(pConn, "use abc1"); + TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2"); + if (taos_errno(pRes) != 0) { + printf("error in create db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "use abc1"); taos_free_result(pRes); pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)"); diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index dd8c728269..41d911e4c7 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -20,17 +20,23 @@ #include "tarray.h" static void dataColSetNEleNull(SDataCol *pCol, int nEle); +#if 0 static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, int limit2, int tRows, bool forceSetNull); - +#endif int tdAllocMemForCol(SDataCol *pCol, int maxPoints) { int spaceNeeded = pCol->bytes * maxPoints; if(IS_VAR_DATA_TYPE(pCol->type)) { spaceNeeded += sizeof(VarDataOffsetT) * maxPoints; } #ifdef TD_SUPPORT_BITMAP - spaceNeeded += (int)TD_BITMAP_BYTES(maxPoints); + int32_t nBitmapBytes = (int32_t)TD_BITMAP_BYTES(maxPoints); + spaceNeeded += (int)nBitmapBytes; + // TODO: Currently, the compression of bitmap parts is affiliated to the column data parts, thus allocate 1 more + // TYPE_BYTES as to comprise complete TYPE_BYTES. Otherwise, invalid read/write would be triggered. + spaceNeeded += TYPE_BYTES[pCol->type]; #endif + if(pCol->spaceSize < spaceNeeded) { void* ptr = realloc(pCol->pData, spaceNeeded); if(ptr == NULL) { @@ -42,16 +48,17 @@ int tdAllocMemForCol(SDataCol *pCol, int maxPoints) { pCol->spaceSize = spaceNeeded; } } - if (IS_VAR_DATA_TYPE(pCol->type)) { - pCol->dataOff = POINTER_SHIFT(pCol->pData, pCol->bytes * maxPoints); #ifdef TD_SUPPORT_BITMAP - pCol->pBitmap = POINTER_SHIFT(pCol->dataOff, sizeof(VarDataOffsetT) * maxPoints); -#endif - } -#ifdef TD_SUPPORT_BITMAP - else { + if (IS_VAR_DATA_TYPE(pCol->type)) { + pCol->pBitmap = POINTER_SHIFT(pCol->pData, pCol->bytes * maxPoints); + pCol->dataOff = POINTER_SHIFT(pCol->pBitmap, nBitmapBytes); + } else { pCol->pBitmap = POINTER_SHIFT(pCol->pData, pCol->bytes * maxPoints); } +#else + if (IS_VAR_DATA_TYPE(pCol->type)) { + pCol->dataOff = POINTER_SHIFT(pCol->pData, pCol->bytes * maxPoints); + } #endif return 0; } @@ -205,6 +212,7 @@ STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder) { return pSchema; } +#if 0 /** * Initialize a data row */ @@ -245,12 +253,13 @@ SMemRow tdMemRowDup(SMemRow row) { memRowCpy(trow, row); return trow; } +#endif void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints) { pDataCol->type = colType(pCol); pDataCol->colId = colColId(pCol); pDataCol->bytes = colBytes(pCol); - pDataCol->offset = colOffset(pCol) + TD_DATA_ROW_HEAD_SIZE; + pDataCol->offset = colOffset(pCol) + 0; //TD_DATA_ROW_HEAD_SIZE; pDataCol->len = 0; } @@ -326,7 +335,7 @@ static void dataColSetNEleNull(SDataCol *pCol, int nEle) { } } -void dataColSetOffset(SDataCol *pCol, int nEle) { +void *dataColSetOffset(SDataCol *pCol, int nEle) { ASSERT(((pCol->type == TSDB_DATA_TYPE_BINARY) || (pCol->type == TSDB_DATA_TYPE_NCHAR))); void *tptr = pCol->pData; @@ -338,6 +347,7 @@ void dataColSetOffset(SDataCol *pCol, int nEle) { offset += varDataTLen(tptr); tptr = POINTER_SHIFT(tptr, varDataTLen(tptr)); } + return POINTER_SHIFT(tptr, varDataTLen(tptr)); } SDataCols *tdNewDataCols(int maxCols, int maxRows) { @@ -455,7 +465,7 @@ void tdResetDataCols(SDataCols *pCols) { } } } - +#if 0 static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) { ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < dataRowKey(row)); @@ -628,6 +638,7 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i ASSERT(target->numOfRows <= target->maxPoints); } } +#endif SKVRow tdKVRowDup(SKVRow row) { SKVRow trow = malloc(kvRowLen(row)); @@ -787,7 +798,7 @@ SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) { return row; } - +#if 0 SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSchema1, STSchema *pSchema2) { #if 0 ASSERT(memRowKey(row1) == memRowKey(row2)); @@ -881,3 +892,4 @@ SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSch taosArrayDestroy(stashRow); return buffer; } +#endif \ No newline at end of file diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index e45b61554c..18c2990f52 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -67,19 +67,19 @@ int tInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) { if (pBlock->dataLen <= 0) return -1; pIter->totalLen = pBlock->dataLen; pIter->len = 0; - pIter->row = (SMemRow)(pBlock->data + pBlock->schemaLen); + pIter->row = (STSRow *)(pBlock->data + pBlock->schemaLen); return 0; } -SMemRow tGetSubmitBlkNext(SSubmitBlkIter *pIter) { - SMemRow row = pIter->row; +STSRow *tGetSubmitBlkNext(SSubmitBlkIter *pIter) { + STSRow *row = pIter->row; if (pIter->len >= pIter->totalLen) { return NULL; } else { - pIter->len += memRowTLen(row); + pIter->len += TD_ROW_LEN(row); if (pIter->len < pIter->totalLen) { - pIter->row = POINTER_SHIFT(row, memRowTLen(row)); + pIter->row = POINTER_SHIFT(row, TD_ROW_LEN(row)); } return row; } diff --git a/source/common/src/trow.c b/source/common/src/trow.c index b0c14a3159..c9749351e1 100644 --- a/source/common/src/trow.c +++ b/source/common/src/trow.c @@ -14,6 +14,7 @@ */ #include "trow.h" +#include "tarray.h" const uint8_t tdVTypeByte[3] = { TD_VTYPE_NORM_BYTE, // TD_VTYPE_NORM @@ -21,11 +22,11 @@ const uint8_t tdVTypeByte[3] = { TD_VTYPE_NULL_BYTE, // TD_VTYPE_NULL }; -static void dataColSetNEleNull(SDataCol *pCol, int nEle); +// static void dataColSetNEleNull(SDataCol *pCol, int nEle); static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, int limit2, int tRows, bool forceSetNull); -static FORCE_INLINE void dataColSetNullAt(SDataCol *pCol, int index) { +static FORCE_INLINE void dataColSetNullAt(SDataCol *pCol, int index, bool setBitmap) { if (IS_VAR_DATA_TYPE(pCol->type)) { pCol->dataOff[index] = pCol->len; char *ptr = POINTER_SHIFT(pCol->pData, pCol->len); @@ -35,22 +36,25 @@ static FORCE_INLINE void dataColSetNullAt(SDataCol *pCol, int index) { setNull(POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * index), pCol->type, pCol->bytes); pCol->len += TYPE_BYTES[pCol->type]; } -} - -static void dataColSetNEleNull(SDataCol *pCol, int nEle) { - if (IS_VAR_DATA_TYPE(pCol->type)) { - pCol->len = 0; - for (int i = 0; i < nEle; i++) { - dataColSetNullAt(pCol, i); - } - } else { - setNullN(pCol->pData, pCol->type, pCol->bytes, nEle); - pCol->len = TYPE_BYTES[pCol->type] * nEle; + if (setBitmap) { + tdSetBitmapValType(pCol->pBitmap, index, TD_VTYPE_NONE); } } -static FORCE_INLINE int32_t tdSetBitmapValTypeN(void *pBitmap, int16_t nEle, TDRowValT valType) { - TASSERT(valType <= TD_VTYPE_NULL); +// static void dataColSetNEleNull(SDataCol *pCol, int nEle) { +// if (IS_VAR_DATA_TYPE(pCol->type)) { +// pCol->len = 0; +// for (int i = 0; i < nEle; i++) { +// dataColSetNullAt(pCol, i); +// } +// } else { +// setNullN(pCol->pData, pCol->type, pCol->bytes, nEle); +// pCol->len = TYPE_BYTES[pCol->type] * nEle; +// } +// } + +int32_t tdSetBitmapValTypeN(void *pBitmap, int16_t nEle, TDRowValT valType) { + TASSERT(valType < TD_VTYPE_MAX); int16_t nBytes = nEle / TD_VTYPE_PARTS; for (int i = 0; i < nBytes; ++i) { *(uint8_t *)pBitmap = tdVTypeByte[valType]; @@ -61,9 +65,10 @@ static FORCE_INLINE int32_t tdSetBitmapValTypeN(void *pBitmap, int16_t nEle, TDR for (int j = 0; j < nLeft; ++j) { tdSetBitmapValType(pBitmap, j, valType); } + return TSDB_CODE_SUCCESS; } -static FORCE_INLINE void dataColSetNoneAt(SDataCol *pCol, int index) { +static FORCE_INLINE void dataColSetNoneAt(SDataCol *pCol, int index, bool setBitmap) { if (IS_VAR_DATA_TYPE(pCol->type)) { pCol->dataOff[index] = pCol->len; char *ptr = POINTER_SHIFT(pCol->pData, pCol->len); @@ -73,13 +78,16 @@ static FORCE_INLINE void dataColSetNoneAt(SDataCol *pCol, int index) { setNull(POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * index), pCol->type, pCol->bytes); pCol->len += TYPE_BYTES[pCol->type]; } + if(setBitmap) { + tdSetBitmapValType(pCol->pBitmap, index, TD_VTYPE_NONE); + } } static void dataColSetNEleNone(SDataCol *pCol, int nEle) { if (IS_VAR_DATA_TYPE(pCol->type)) { pCol->len = 0; for (int i = 0; i < nEle; ++i) { - dataColSetNoneAt(pCol, i); + dataColSetNoneAt(pCol, i, false); } } else { setNullN(pCol->pData, pCol->type, pCol->bytes, nEle); @@ -108,395 +116,36 @@ int trbWriteCol(SRowBuilder *pRB, void *pData, col_id_t cid) { return 0; } -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -#include "tdataformat.h" -#include "ulog.h" -#include "talgo.h" -#include "tcoding.h" -#include "wchar.h" -#include "tarray.h" - - - -/** - * Duplicate the schema and return a new object - */ -STSchema *tdDupSchema(const STSchema *pSchema) { - - int tlen = sizeof(STSchema) + sizeof(STColumn) * schemaNCols(pSchema); - STSchema *tSchema = (STSchema *)malloc(tlen); - if (tSchema == NULL) return NULL; - - memcpy((void *)tSchema, (void *)pSchema, tlen); - - return tSchema; -} - -/** - * Encode a schema to dst, and return the next pointer - */ -int tdEncodeSchema(void **buf, STSchema *pSchema) { - int tlen = 0; - tlen += taosEncodeFixedI32(buf, schemaVersion(pSchema)); - tlen += taosEncodeFixedI32(buf, schemaNCols(pSchema)); - - for (int i = 0; i < schemaNCols(pSchema); i++) { - STColumn *pCol = schemaColAt(pSchema, i); - tlen += taosEncodeFixedI8(buf, colType(pCol)); - tlen += taosEncodeFixedI16(buf, colColId(pCol)); - tlen += taosEncodeFixedI16(buf, colBytes(pCol)); - } - - return tlen; -} - -/** - * Decode a schema from a binary. - */ -void *tdDecodeSchema(void *buf, STSchema **pRSchema) { - int version = 0; - int numOfCols = 0; - STSchemaBuilder schemaBuilder; - - buf = taosDecodeFixedI32(buf, &version); - buf = taosDecodeFixedI32(buf, &numOfCols); - - if (tdInitTSchemaBuilder(&schemaBuilder, version) < 0) return NULL; - - for (int i = 0; i < numOfCols; i++) { - int8_t type = 0; - int16_t colId = 0; - int16_t bytes = 0; - buf = taosDecodeFixedI8(buf, &type); - buf = taosDecodeFixedI16(buf, &colId); - buf = taosDecodeFixedI16(buf, &bytes); - if (tdAddColToSchema(&schemaBuilder, type, colId, bytes) < 0) { - tdDestroyTSchemaBuilder(&schemaBuilder); - return NULL; - } - } - - *pRSchema = tdGetSchemaFromBuilder(&schemaBuilder); - tdDestroyTSchemaBuilder(&schemaBuilder); - return buf; -} - -int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) { - if (pBuilder == NULL) return -1; - - pBuilder->tCols = 256; - pBuilder->columns = (STColumn *)malloc(sizeof(STColumn) * pBuilder->tCols); - if (pBuilder->columns == NULL) return -1; - - tdResetTSchemaBuilder(pBuilder, version); - return 0; -} - -void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder) { - if (pBuilder) { - tfree(pBuilder->columns); - } -} - -void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) { - pBuilder->nCols = 0; - pBuilder->tlen = 0; - pBuilder->flen = 0; - pBuilder->vlen = 0; - pBuilder->version = version; -} - -int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId, int16_t bytes) { - if (!isValidDataType(type)) return -1; - - if (pBuilder->nCols >= pBuilder->tCols) { - pBuilder->tCols *= 2; - STColumn* columns = (STColumn *)realloc(pBuilder->columns, sizeof(STColumn) * pBuilder->tCols); - if (columns == NULL) return -1; - pBuilder->columns = columns; - } - - STColumn *pCol = &(pBuilder->columns[pBuilder->nCols]); - colSetType(pCol, type); - colSetColId(pCol, colId); - if (pBuilder->nCols == 0) { - colSetOffset(pCol, 0); - } else { - STColumn *pTCol = &(pBuilder->columns[pBuilder->nCols-1]); - colSetOffset(pCol, pTCol->offset + TYPE_BYTES[pTCol->type]); - } - - if (IS_VAR_DATA_TYPE(type)) { - colSetBytes(pCol, bytes); - pBuilder->tlen += (TYPE_BYTES[type] + bytes); - pBuilder->vlen += bytes - sizeof(VarDataLenT); - } else { - colSetBytes(pCol, TYPE_BYTES[type]); - pBuilder->tlen += TYPE_BYTES[type]; - pBuilder->vlen += TYPE_BYTES[type]; - } - - pBuilder->nCols++; - pBuilder->flen += TYPE_BYTES[type]; - - ASSERT(pCol->offset < pBuilder->flen); - - return 0; -} - -STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder) { - if (pBuilder->nCols <= 0) return NULL; - - int tlen = sizeof(STSchema) + sizeof(STColumn) * pBuilder->nCols; - - STSchema *pSchema = (STSchema *)malloc(tlen); - if (pSchema == NULL) return NULL; - - schemaVersion(pSchema) = pBuilder->version; - schemaNCols(pSchema) = pBuilder->nCols; - schemaTLen(pSchema) = pBuilder->tlen; - schemaFLen(pSchema) = pBuilder->flen; - schemaVLen(pSchema) = pBuilder->vlen; - - memcpy(schemaColAt(pSchema, 0), pBuilder->columns, sizeof(STColumn) * pBuilder->nCols); - - return pSchema; -} - -/** - * Initialize a data row - */ -void tdInitDataRow(SDataRow row, STSchema *pSchema) { - dataRowSetLen(row, TD_DATA_ROW_HEAD_SIZE + schemaFLen(pSchema)); - dataRowSetVersion(row, schemaVersion(pSchema)); -} - -SDataRow tdNewDataRowFromSchema(STSchema *pSchema) { - int32_t size = dataRowMaxBytesFromSchema(pSchema); - - SDataRow row = malloc(size); - if (row == NULL) return NULL; - - tdInitDataRow(row, pSchema); - return row; -} - -/** - * Free the SDataRow object - */ -void tdFreeDataRow(SDataRow row) { - if (row) free(row); -} - -SDataRow tdDataRowDup(SDataRow row) { - SDataRow trow = malloc(dataRowLen(row)); - if (trow == NULL) return NULL; - - dataRowCpy(trow, row); - return trow; -} +#endif -SMemRow tdMemRowDup(SMemRow row) { - SMemRow trow = malloc(memRowTLen(row)); +STSRow* tdRowDup(STSRow *row) { + STSRow* trow = malloc(TD_ROW_LEN(row)); if (trow == NULL) return NULL; - memRowCpy(trow, row); + tdRowCpy(trow, row); return trow; } -void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints) { - pDataCol->type = colType(pCol); - pDataCol->colId = colColId(pCol); - pDataCol->bytes = colBytes(pCol); - pDataCol->offset = colOffset(pCol) + TD_DATA_ROW_HEAD_SIZE; - - pDataCol->len = 0; -} - -#endif - - - -#if 0 -static FORCE_INLINE const void *tdGetColDataOfRowUnsafe(SDataCol *pCol, int row) { - if (IS_VAR_DATA_TYPE(pCol->type)) { - return POINTER_SHIFT(pCol->pData, pCol->dataOff[row]); - } else { - return POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * row); - } -} - -bool isNEleNull(SDataCol *pCol, int nEle) { - if(isAllRowsNull(pCol)) return true; - for (int i = 0; i < nEle; i++) { - if (!isNull(tdGetColDataOfRowUnsafe(pCol, i), pCol->type)) return false; - } - return true; -} - - - -void dataColSetOffset(SDataCol *pCol, int nEle) { - ASSERT(((pCol->type == TSDB_DATA_TYPE_BINARY) || (pCol->type == TSDB_DATA_TYPE_NCHAR))); - - void *tptr = pCol->pData; - // char *tptr = (char *)(pCol->pData); - - VarDataOffsetT offset = 0; - for (int i = 0; i < nEle; i++) { - pCol->dataOff[i] = offset; - offset += varDataTLen(tptr); - tptr = POINTER_SHIFT(tptr, varDataTLen(tptr)); - } -} - -SDataCols *tdNewDataCols(int maxCols, int maxRows) { - SDataCols *pCols = (SDataCols *)calloc(1, sizeof(SDataCols)); - if (pCols == NULL) { - uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)sizeof(SDataCols), strerror(errno)); - return NULL; - } - - pCols->maxPoints = maxRows; - pCols->maxCols = maxCols; - pCols->numOfRows = 0; - pCols->numOfCols = 0; - - if (maxCols > 0) { - pCols->cols = (SDataCol *)calloc(maxCols, sizeof(SDataCol)); - if (pCols->cols == NULL) { - uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)sizeof(SDataCol) * maxCols, - strerror(errno)); - tdFreeDataCols(pCols); - return NULL; - } - int i; - for(i = 0; i < maxCols; i++) { - pCols->cols[i].spaceSize = 0; - pCols->cols[i].len = 0; - pCols->cols[i].pData = NULL; - pCols->cols[i].dataOff = NULL; - } - } - - return pCols; -} - -int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) { - int i; - int oldMaxCols = pCols->maxCols; - if (schemaNCols(pSchema) > oldMaxCols) { - pCols->maxCols = schemaNCols(pSchema); - void* ptr = (SDataCol *)realloc(pCols->cols, sizeof(SDataCol) * pCols->maxCols); - if (ptr == NULL) return -1; - pCols->cols = ptr; - for(i = oldMaxCols; i < pCols->maxCols; i++) { - pCols->cols[i].pData = NULL; - pCols->cols[i].dataOff = NULL; - pCols->cols[i].spaceSize = 0; - } - } - - tdResetDataCols(pCols); - pCols->numOfCols = schemaNCols(pSchema); - - for (i = 0; i < schemaNCols(pSchema); i++) { - dataColInit(pCols->cols + i, schemaColAt(pSchema, i), pCols->maxPoints); - } - - return 0; -} - -SDataCols *tdFreeDataCols(SDataCols *pCols) { - int i; - if (pCols) { - if(pCols->cols) { - int maxCols = pCols->maxCols; - for(i = 0; i < maxCols; i++) { - SDataCol *pCol = &pCols->cols[i]; - tfree(pCol->pData); - } - free(pCols->cols); - pCols->cols = NULL; - } - free(pCols); - } - return NULL; -} - -SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) { - SDataCols *pRet = tdNewDataCols(pDataCols->maxCols, pDataCols->maxPoints); - if (pRet == NULL) return NULL; - - pRet->numOfCols = pDataCols->numOfCols; - pRet->sversion = pDataCols->sversion; - if (keepData) pRet->numOfRows = pDataCols->numOfRows; - - for (int i = 0; i < pDataCols->numOfCols; i++) { - pRet->cols[i].type = pDataCols->cols[i].type; - pRet->cols[i].colId = pDataCols->cols[i].colId; - pRet->cols[i].bytes = pDataCols->cols[i].bytes; - pRet->cols[i].offset = pDataCols->cols[i].offset; - - if (keepData) { - if (pDataCols->cols[i].len > 0) { - if(tdAllocMemForCol(&pRet->cols[i], pRet->maxPoints) < 0) { - tdFreeDataCols(pRet); - return NULL; - } - pRet->cols[i].len = pDataCols->cols[i].len; - memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len); - if (IS_VAR_DATA_TYPE(pRet->cols[i].type)) { - int dataOffSize = sizeof(VarDataOffsetT) * pDataCols->maxPoints; - memcpy(pRet->cols[i].dataOff, pDataCols->cols[i].dataOff, dataOffSize); - } - } - } - } - - return pRet; -} - -void tdResetDataCols(SDataCols *pCols) { - if (pCols != NULL) { - pCols->numOfRows = 0; - for (int i = 0; i < pCols->maxCols; i++) { - dataColReset(pCols->cols + i); - } - } -} -#endif - int tdAppendValToDataCol(SDataCol *pCol, TDRowValT valType, const void *val, int numOfRows, int maxPoints) { - ASSERT(pCol != NULL); + TASSERT(pCol != NULL); - // Assume that, the columns not specified during insert/upsert is None. + // Assume that the columns not specified during insert/upsert mean None. if (isAllRowsNone(pCol)) { if (tdValIsNone(valType)) { // all None value yet, just return return 0; } - if(tdAllocMemForCol(pCol, maxPoints) < 0) return -1; + if (tdAllocMemForCol(pCol, maxPoints) < 0) return -1; if (numOfRows > 0) { // Find the first not None value, fill all previous values as None dataColSetNEleNone(pCol, numOfRows); } } if (!tdValTypeIsNorm(valType)) { + // TODO: + // 1. back compatibility and easy to debug with codes of 2.0 to save NULL values. + // 2. later on, considering further optimization, don't save Null/None for VarType. val = getNullValue(pCol->type); } if (IS_VAR_DATA_TYPE(pCol->type)) { @@ -519,7 +168,7 @@ int tdAppendValToDataCol(SDataCol *pCol, TDRowValT valType, const void *val, int // internal static int32_t tdAppendTpRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols) { - ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < TD_ROW_TSKEY(pRow)); + ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < TD_ROW_KEY(pRow)); int rcol = 1; int dcol = 1; @@ -527,7 +176,7 @@ static int32_t tdAppendTpRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols SDataCol *pDataCol = &(pCols->cols[0]); if (pDataCol->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { - tdAppendValToDataCol(pDataCol, pRow->ts, TD_VTYPE_NORM, pCols->numOfRows, pCols->maxPoints); + tdAppendValToDataCol(pDataCol, TD_VTYPE_NORM, &pRow->ts, pCols->numOfRows, pCols->maxPoints); } while (dcol < pCols->numOfCols) { @@ -560,7 +209,7 @@ static int32_t tdAppendTpRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols } // internal static int32_t tdAppendKvRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCols) { - ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < TD_ROW_TSKEY(pRow)); + ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < TD_ROW_KEY(pRow)); int rcol = 0; int dcol = 1; @@ -570,7 +219,7 @@ static int32_t tdAppendKvRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols SDataCol *pDataCol = &(pCols->cols[0]); if (pDataCol->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { - tdAppendValToDataCol(pDataCol, pRow->ts, TD_VTYPE_NORM, pCols->numOfRows, pCols->maxPoints); + tdAppendValToDataCol(pDataCol, TD_VTYPE_NORM, &pRow->ts, pCols->numOfRows, pCols->maxPoints); } while (dcol < pCols->numOfCols) { @@ -625,8 +274,6 @@ int32_t tdAppendSTSRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols *pCol return TSDB_CODE_SUCCESS; } -#if 0 - int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset, bool forceSetNull) { ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows); ASSERT(target->numOfCols == source->numOfCols); @@ -643,11 +290,14 @@ int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int * for (int i = 0; i < rowsToMerge; i++) { for (int j = 0; j < source->numOfCols; j++) { if (source->cols[j].len > 0 || target->cols[j].len > 0) { - dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i + (*pOffset)), target->numOfRows, - target->maxPoints); + SCellVal sVal = {0}; + if (tdGetColDataOfRow(&sVal, source->cols + j, i + (*pOffset)) < 0) { + TASSERT(0); + } + tdAppendValToDataCol(target->cols + j, sVal.valType, sVal.val, target->numOfRows, target->maxPoints); } } - target->numOfRows++; + ++target->numOfRows; } (*pOffset) += rowsToMerge; } else { @@ -679,7 +329,7 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i TSKEY key1 = (*iter1 >= limit1) ? INT64_MAX : dataColsKeyAt(src1, *iter1); TKEY tkey1 = (*iter1 >= limit1) ? TKEY_NULL : dataColsTKeyAt(src1, *iter1); TSKEY key2 = (*iter2 >= limit2) ? INT64_MAX : dataColsKeyAt(src2, *iter2); - TKEY tkey2 = (*iter2 >= limit2) ? TKEY_NULL : dataColsTKeyAt(src2, *iter2); + // TKEY tkey2 = (*iter2 >= limit2) ? TKEY_NULL : dataColsTKeyAt(src2, *iter2); ASSERT(tkey1 == TKEY_NULL || (!TKEY_IS_DELETED(tkey1))); @@ -687,25 +337,34 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i for (int i = 0; i < src1->numOfCols; i++) { ASSERT(target->cols[i].type == src1->cols[i].type); if (src1->cols[i].len > 0 || target->cols[i].len > 0) { - dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows, - target->maxPoints); + SCellVal sVal = {0}; + if (tdGetColDataOfRow(&sVal, src1->cols + i, *iter1) < 0) { + TASSERT(0); + } + tdAppendValToDataCol(&(target->cols[i]), sVal.valType, sVal.val, target->numOfRows, target->maxPoints); } } target->numOfRows++; (*iter1)++; } else if (key1 >= key2) { - if ((key1 > key2) || (key1 == key2 && !TKEY_IS_DELETED(tkey2))) { + // if ((key1 > key2) || (key1 == key2 && !TKEY_IS_DELETED(tkey2))) { + if ((key1 > key2) || (key1 == key2)) { for (int i = 0; i < src2->numOfCols; i++) { + SCellVal sVal = {0}; ASSERT(target->cols[i].type == src2->cols[i].type); if (src2->cols[i].len > 0 && !isNull(src2->cols[i].pData, src2->cols[i].type)) { - dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfRows, - target->maxPoints); - } else if(!forceSetNull && key1 == key2 && src1->cols[i].len > 0) { - dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows, - target->maxPoints); - } else if(target->cols[i].len > 0) { - dataColSetNullAt(&target->cols[i], target->numOfRows); + if (tdGetColDataOfRow(&sVal, src1->cols + i, *iter1) < 0) { + TASSERT(0); + } + tdAppendValToDataCol(&(target->cols[i]), sVal.valType, sVal.val, target->numOfRows, target->maxPoints); + } else if (!forceSetNull && key1 == key2 && src1->cols[i].len > 0) { + if (tdGetColDataOfRow(&sVal, src1->cols + i, *iter1) < 0) { + TASSERT(0); + } + tdAppendValToDataCol(&(target->cols[i]), sVal.valType, sVal.val, target->numOfRows, target->maxPoints); + } else if (target->cols[i].len > 0) { + dataColSetNullAt(&target->cols[i], target->numOfRows, true); } } target->numOfRows++; @@ -719,180 +378,24 @@ static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, i } } -SKVRow tdKVRowDup(SKVRow row) { - SKVRow trow = malloc(kvRowLen(row)); - if (trow == NULL) return NULL; - kvRowCpy(trow, row); - return trow; -} -static int compareColIdx(const void* a, const void* b) { - const SColIdx* x = (const SColIdx*)a; - const SColIdx* y = (const SColIdx*)b; - if (x->colId > y->colId) { - return 1; - } - if (x->colId < y->colId) { - return -1; - } - return 0; -} - -void tdSortKVRowByColIdx(SKVRow row) { - qsort(kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), compareColIdx); -} - -int tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value) { - SColIdx *pColIdx = NULL; - SKVRow row = *orow; - SKVRow nrow = NULL; - void * ptr = taosbsearch(&colId, kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), comparTagId, TD_GE); - - if (ptr == NULL || ((SColIdx *)ptr)->colId > colId) { // need to add a column value to the row - int diff = IS_VAR_DATA_TYPE(type) ? varDataTLen(value) : TYPE_BYTES[type]; - int nRowLen = kvRowLen(row) + sizeof(SColIdx) + diff; - int oRowCols = kvRowNCols(row); - - ASSERT(diff > 0); - nrow = malloc(nRowLen); - if (nrow == NULL) return -1; - - kvRowSetLen(nrow, nRowLen); - kvRowSetNCols(nrow, oRowCols + 1); - - memcpy(kvRowColIdx(nrow), kvRowColIdx(row), sizeof(SColIdx) * oRowCols); - memcpy(kvRowValues(nrow), kvRowValues(row), kvRowValLen(row)); - - pColIdx = kvRowColIdxAt(nrow, oRowCols); - pColIdx->colId = colId; - pColIdx->offset = kvRowValLen(row); - - memcpy(kvRowColVal(nrow, pColIdx), value, diff); // copy new value - - tdSortKVRowByColIdx(nrow); - - *orow = nrow; - free(row); - } else { - ASSERT(((SColIdx *)ptr)->colId == colId); - if (IS_VAR_DATA_TYPE(type)) { - void *pOldVal = kvRowColVal(row, (SColIdx *)ptr); - - if (varDataTLen(value) == varDataTLen(pOldVal)) { // just update the column value in place - memcpy(pOldVal, value, varDataTLen(value)); - } else { // need to reallocate the memory - int16_t nlen = kvRowLen(row) + (varDataTLen(value) - varDataTLen(pOldVal)); - ASSERT(nlen > 0); - nrow = malloc(nlen); - if (nrow == NULL) return -1; - - kvRowSetLen(nrow, nlen); - kvRowSetNCols(nrow, kvRowNCols(row)); - - int zsize = sizeof(SColIdx) * kvRowNCols(row) + ((SColIdx *)ptr)->offset; - memcpy(kvRowColIdx(nrow), kvRowColIdx(row), zsize); - memcpy(kvRowColVal(nrow, ((SColIdx *)ptr)), value, varDataTLen(value)); - // Copy left value part - int lsize = kvRowLen(row) - TD_KV_ROW_HEAD_SIZE - zsize - varDataTLen(pOldVal); - if (lsize > 0) { - memcpy(POINTER_SHIFT(nrow, TD_KV_ROW_HEAD_SIZE + zsize + varDataTLen(value)), - POINTER_SHIFT(row, TD_KV_ROW_HEAD_SIZE + zsize + varDataTLen(pOldVal)), lsize); - } - - for (int i = 0; i < kvRowNCols(nrow); i++) { - pColIdx = kvRowColIdxAt(nrow, i); - - if (pColIdx->offset > ((SColIdx *)ptr)->offset) { - pColIdx->offset = pColIdx->offset - varDataTLen(pOldVal) + varDataTLen(value); - } - } - - *orow = nrow; - free(row); - } - } else { - memcpy(kvRowColVal(row, (SColIdx *)ptr), value, TYPE_BYTES[type]); - } - } - - return 0; -} - -int tdEncodeKVRow(void **buf, SKVRow row) { - // May change the encode purpose - if (buf != NULL) { - kvRowCpy(*buf, row); - *buf = POINTER_SHIFT(*buf, kvRowLen(row)); - } - - return kvRowLen(row); -} - -void *tdDecodeKVRow(void *buf, SKVRow *row) { - *row = tdKVRowDup(buf); - if (*row == NULL) return NULL; - return POINTER_SHIFT(buf, kvRowLen(*row)); -} - -int tdInitKVRowBuilder(SKVRowBuilder *pBuilder) { - pBuilder->tCols = 128; - pBuilder->nCols = 0; - pBuilder->pColIdx = (SColIdx *)malloc(sizeof(SColIdx) * pBuilder->tCols); - if (pBuilder->pColIdx == NULL) return -1; - pBuilder->alloc = 1024; - pBuilder->size = 0; - pBuilder->buf = malloc(pBuilder->alloc); - if (pBuilder->buf == NULL) { - free(pBuilder->pColIdx); - return -1; - } - return 0; -} - -void tdDestroyKVRowBuilder(SKVRowBuilder *pBuilder) { - tfree(pBuilder->pColIdx); - tfree(pBuilder->buf); -} - -void tdResetKVRowBuilder(SKVRowBuilder *pBuilder) { - pBuilder->nCols = 0; - pBuilder->size = 0; -} - -SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) { - int tlen = sizeof(SColIdx) * pBuilder->nCols + pBuilder->size; - if (tlen == 0) return NULL; - - tlen += TD_KV_ROW_HEAD_SIZE; - - SKVRow row = malloc(tlen); - if (row == NULL) return NULL; - - kvRowSetNCols(row, pBuilder->nCols); - kvRowSetLen(row, tlen); - - memcpy(kvRowColIdx(row), pBuilder->pColIdx, sizeof(SColIdx) * pBuilder->nCols); - memcpy(kvRowValues(row), pBuilder->buf, pBuilder->size); - - return row; -} - -SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSchema1, STSchema *pSchema2) { +STSRow* mergeTwoRows(void *buffer, STSRow* row1, STSRow *row2, STSchema *pSchema1, STSchema *pSchema2) { #if 0 - ASSERT(memRowKey(row1) == memRowKey(row2)); - ASSERT(schemaVersion(pSchema1) == memRowVersion(row1)); - ASSERT(schemaVersion(pSchema2) == memRowVersion(row2)); + ASSERT(TD_ROW_KEY(row1) == TD_ROW_KEY(row2)); + ASSERT(schemaVersion(pSchema1) == TD_ROW_SVER(row1)); + ASSERT(schemaVersion(pSchema2) == TD_ROW_SVER(row2)); ASSERT(schemaVersion(pSchema1) >= schemaVersion(pSchema2)); #endif +#if 0 SArray *stashRow = taosArrayInit(pSchema1->numOfCols, sizeof(SColInfo)); if (stashRow == NULL) { return NULL; } - SMemRow pRow = buffer; - SDataRow dataRow = memRowDataBody(pRow); + STSRow pRow = buffer; + STpRow dataRow = memRowDataBody(pRow); memRowSetType(pRow, SMEM_ROW_DATA); dataRowSetVersion(dataRow, schemaVersion(pSchema1)); // use latest schema version dataRowSetLen(dataRow, (TDRowLenT)(TD_DATA_ROW_HEAD_SIZE + pSchema1->flen)); @@ -946,12 +449,12 @@ SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSch ++i; // next col } - dataLen = memRowTLen(pRow); + dataLen = TD_ROW_LEN(pRow); if (kvLen < dataLen) { // scan stashRow and generate SKVRow memset(buffer, 0, sizeof(dataLen)); - SMemRow tRow = buffer; + STSRow tRow = buffer; memRowSetType(tRow, SMEM_ROW_KV); SKVRow kvRow = (SKVRow)memRowKvBody(tRow); int16_t nKvNCols = (int16_t) taosArrayGetSize(stashRow); @@ -966,10 +469,10 @@ SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSch tdAppendKvColVal(kvRow, pColInfo->colVal, true, pColInfo->colId, pColInfo->colType, toffset); toffset += sizeof(SColIdx); } - ASSERT(kvLen == memRowTLen(tRow)); + ASSERT(kvLen == TD_ROW_LEN(tRow)); } taosArrayDestroy(stashRow); return buffer; + #endif + return NULL; } - -#endif \ No newline at end of file diff --git a/source/dnode/vnode/src/inc/tsdbMemTable.h b/source/dnode/vnode/src/inc/tsdbMemTable.h index d2e3d488d0..c6fbdb407c 100644 --- a/source/dnode/vnode/src/inc/tsdbMemTable.h +++ b/source/dnode/vnode/src/inc/tsdbMemTable.h @@ -58,20 +58,20 @@ int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmi int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols, TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo); -static FORCE_INLINE SMemRow tsdbNextIterRow(SSkipListIterator *pIter) { +static FORCE_INLINE STSRow *tsdbNextIterRow(SSkipListIterator *pIter) { if (pIter == NULL) return NULL; SSkipListNode *node = tSkipListIterGet(pIter); if (node == NULL) return NULL; - return (SMemRow)SL_GET_NODE_DATA(node); + return (STSRow *)SL_GET_NODE_DATA(node); } static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator *pIter) { - SMemRow row = tsdbNextIterRow(pIter); + STSRow *row = tsdbNextIterRow(pIter); if (row == NULL) return TSDB_DATA_TIMESTAMP_NULL; - return memRowKey(row); + return TD_ROW_KEY(row); } #ifdef __cplusplus diff --git a/source/dnode/vnode/src/inc/tsdbReadImpl.h b/source/dnode/vnode/src/inc/tsdbReadImpl.h index 4f1452019d..990a666936 100644 --- a/source/dnode/vnode/src/inc/tsdbReadImpl.h +++ b/source/dnode/vnode/src/inc/tsdbReadImpl.h @@ -75,8 +75,8 @@ typedef struct { typedef struct { int16_t colId; - int16_t bitmap : 1; // 0: no bitmap if all rows not null, 1: has bitmap if has null rows - int16_t reserve : 15; + uint16_t bitmap : 1; // 0: has bitmap if has NULL/NORM rows, 1: no bitmap if all rows are NORM + uint16_t reserve : 15; int32_t len; uint32_t type : 8; uint32_t offset : 24; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a625980505..f65c0d9893 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -448,19 +448,27 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { taosArrayPush(pArray, &colInfo); } - SMemRow row; - int32_t kvIdx = 0; + STSRowIter iter = {0}; + tdSTSRowIterInit(&iter, pTschema); + STSRow* row; + // int32_t kvIdx = 0; int32_t curRow = 0; tInitSubmitBlkIter(pHandle->pBlock, &pHandle->blkIter); while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) { + tdSTSRowIterReset(&iter, row); // get all wanted col of that block for (int32_t i = 0; i < colNumNeed; i++) { SColumnInfoData* pColData = taosArrayGet(pArray, i); STColumn* pCol = schemaColAt(pTschema, i); // TODO ASSERT(pCol->colId == pColData->info.colId); - void* val = tdGetMemRowDataOfColEx(row, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx); - memcpy(POINTER_SHIFT(pColData->pData, curRow * pCol->bytes), val, pCol->bytes); + // void* val = tdGetMemRowDataOfColEx(row, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx); + SCellVal sVal = {0}; + if (!tdSTSRowIterNext(&iter, pCol->colId, pCol->type, &sVal)) { + // TODO: reach end + break; + } + memcpy(POINTER_SHIFT(pColData->pData, curRow * pCol->bytes), sVal.val, pCol->bytes); } curRow++; } diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 441ff7230a..6a5fe0c139 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -1162,6 +1162,13 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols * (*tDataTypes[pDataCol->type].statisFunc)(pDataCol->pData, rowsToWrite, &(pBlockCol->min), &(pBlockCol->max), &(pBlockCol->sum), &(pBlockCol->minIndex), &(pBlockCol->maxIndex), &(pBlockCol->numOfNull)); + if (pBlockCol->numOfNull == 0) { + pBlockCol->bitmap = 1; + } else { + pBlockCol->bitmap = 0; + } + } else { + pBlockCol->bitmap = 0; } nColsNotAllNull++; } @@ -1174,6 +1181,9 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols * int32_t tsize = TSDB_BLOCK_STATIS_SIZE(nColsNotAllNull); int32_t lsize = tsize; int32_t keyLen = 0; + int32_t nBitmaps = (int32_t)TD_BITMAP_BYTES(rowsToWrite); + int32_t tBitmaps = 0; + for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) { // All not NULL columns finish if (ncol != 0 && tcol >= nColsNotAllNull) break; @@ -1185,9 +1195,23 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols * int32_t flen; // final length int32_t tlen = dataColGetNEleLen(pDataCol, rowsToWrite); + #ifdef TD_SUPPORT_BITMAP - tlen += (int32_t)TD_BITMAP_BYTES(rowsToWrite); + int32_t tBitmaps = 0; + if ((ncol != 0) && (pBlockCol->bitmap == 0)) { + if (IS_VAR_DATA_TYPE(pDataCol->type)) { + tBitmaps = nBitmaps; + tlen += tBitmaps; + } else { + tBitmaps = (int32_t)ceil((double)nBitmaps / TYPE_BYTES[pDataCol->type]); + tlen += tBitmaps * TYPE_BYTES[pDataCol->type]; + } + // move bitmap parts ahead + // TODO: put bitmap part to the 1st location(pBitmap points to pData) to avoid the memmove + memcpy(POINTER_SHIFT(pDataCol->pData, pDataCol->len), pDataCol->pBitmap, nBitmaps); + } #endif + void * tptr; // Make room @@ -1204,7 +1228,7 @@ int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols * // Compress or just copy if (pCfg->compression) { - flen = (*(tDataTypes[pDataCol->type].compFunc))((char *)pDataCol->pData, tlen, rowsToWrite, tptr, + flen = (*(tDataTypes[pDataCol->type].compFunc))((char *)pDataCol->pData, tlen, rowsToWrite + tBitmaps, tptr, tlen + COMP_OVERFLOW_BYTES, pCfg->compression, *ppCBuf, tlen + COMP_OVERFLOW_BYTES); } else { @@ -1495,11 +1519,11 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt while (true) { key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter); - SMemRow row = tsdbNextIterRow(pCommitIter->pIter); - if (row == NULL || memRowKey(row) > maxKey) { + STSRow *row = tsdbNextIterRow(pCommitIter->pIter); + if (row == NULL || TD_ROW_KEY(row) > maxKey) { key2 = INT64_MAX; } else { - key2 = memRowKey(row); + key2 = TD_ROW_KEY(row); } if (key1 == INT64_MAX && key2 == INT64_MAX) break; @@ -1507,19 +1531,22 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt if (key1 < key2) { for (int i = 0; i < pDataCols->numOfCols; i++) { // TODO: dataColAppendVal may fail - dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows, - pTarget->maxPoints); + SCellVal sVal = {0}; + if (tdGetColDataOfRow(&sVal, pDataCols->cols + i, *iter) < 0) { + TASSERT(0); + } + tdAppendValToDataCol(pTarget->cols + i, sVal.valType, sVal.val, pTarget->numOfRows, pTarget->maxPoints); } pTarget->numOfRows++; (*iter)++; } else if (key1 > key2) { - if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) { - pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row)); + if (pSchema == NULL || schemaVersion(pSchema) != TD_ROW_SVER(row)) { + pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, TD_ROW_SVER(row)); ASSERT(pSchema != NULL); } - tdAppendMemRowToDataCol(row, pSchema, pTarget, true); + tdAppendSTSRowToDataCol(row, pSchema, pTarget, true); tSkipListIterNext(pCommitIter->pIter); } else { @@ -1527,20 +1554,23 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt // copy disk data for (int i = 0; i < pDataCols->numOfCols; i++) { // TODO: dataColAppendVal may fail - dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows, - pTarget->maxPoints); + SCellVal sVal = {0}; + if (tdGetColDataOfRow(&sVal, pDataCols->cols + i, *iter) < 0) { + TASSERT(0); + } + tdAppendValToDataCol(pTarget->cols + i, sVal.valType, sVal.val, pTarget->numOfRows, pTarget->maxPoints); } if (update == TD_ROW_DISCARD_UPDATE) pTarget->numOfRows++; } if (update != TD_ROW_DISCARD_UPDATE) { // copy mem data - if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) { - pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row)); + if (pSchema == NULL || schemaVersion(pSchema) != TD_ROW_SVER(row)) { + pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, TD_ROW_SVER(row)); ASSERT(pSchema != NULL); } - tdAppendMemRowToDataCol(row, pSchema, pTarget, update == TD_ROW_OVERWRITE_UPDATE); + tdAppendSTSRowToDataCol(row, pSchema, pTarget, update == TD_ROW_OVERWRITE_UPDATE); } (*iter)++; tSkipListIterNext(pCommitIter->pIter); diff --git a/source/dnode/vnode/src/tsdb/tsdbMain.c b/source/dnode/vnode/src/tsdb/tsdbMain.c index 4da1e3e428..812ec67ec4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMain.c +++ b/source/dnode/vnode/src/tsdb/tsdbMain.c @@ -760,7 +760,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea int numColumns; int32_t blockIdx; SDataStatis* pBlockStatis = NULL; - SMemRow row = NULL; + STSRow* row = NULL; // restore last column data with last schema int err = 0; @@ -866,7 +866,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea pDataCol = pReadh->pDCols[0]->cols + 0; pCol = schemaColAt(pSchema, 0); tdAppendColVal(memRowDataBody(row), tdGetColDataOfRow(pDataCol, rowId), pCol->type, pCol->offset); - pLastCol->ts = memRowKey(row); + pLastCol->ts = TD_ROW_KEY(row); pTable->restoreColumnNum += 1; diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 81f0a6736b..b51539f697 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -22,7 +22,7 @@ static void tsdbFreeTbData(STbData *pTbData); static char * tsdbGetTsTupleKey(const void *data); static int tsdbTbDataComp(const void *arg1, const void *arg2); static char * tsdbTbDataGetUid(const void *arg); -static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SMemRow row); +static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row); STsdbMemTable *tsdbNewMemTable(STsdb *pTsdb) { STsdbMemTable *pMemTable = (STsdbMemTable *)calloc(1, sizeof(*pMemTable)); @@ -124,7 +124,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey TSKEY fKey = 0; bool isRowDel = false; int filterIter = 0; - SMemRow row = NULL; + STSRow * row = NULL; SMergeInfo mInfo; if (pMergeInfo == NULL) pMergeInfo = &mInfo; @@ -135,12 +135,12 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey if (pCols) tdResetDataCols(pCols); row = tsdbNextIterRow(pIter); - if (row == NULL || memRowKey(row) > maxKey) { + if (row == NULL || TD_ROW_KEY(row) > maxKey) { rowKey = INT64_MAX; isRowDel = false; } else { - rowKey = memRowKey(row); - isRowDel = memRowDeleted(row); + rowKey = TD_ROW_KEY(row); + isRowDel = TD_ROW_IS_DELETED(row); } if (filterIter >= nFilterKeys) { @@ -177,12 +177,12 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey tSkipListIterNext(pIter); row = tsdbNextIterRow(pIter); - if (row == NULL || memRowKey(row) > maxKey) { + if (row == NULL || TD_ROW_KEY(row) > maxKey) { rowKey = INT64_MAX; isRowDel = false; } else { - rowKey = memRowKey(row); - isRowDel = memRowDeleted(row); + rowKey = TD_ROW_KEY(row); + isRowDel = TD_ROW_IS_DELETED(row); } } else { if (isRowDel) { @@ -207,12 +207,12 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey tSkipListIterNext(pIter); row = tsdbNextIterRow(pIter); - if (row == NULL || memRowKey(row) > maxKey) { + if (row == NULL || TD_ROW_KEY(row) > maxKey) { rowKey = INT64_MAX; isRowDel = false; } else { - rowKey = memRowKey(row); - isRowDel = memRowDeleted(row); + rowKey = TD_ROW_KEY(row); + isRowDel = TD_ROW_IS_DELETED(row); } filterIter++; @@ -233,7 +233,7 @@ static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitMsg *pMsg) { SSubmitMsgIter msgIter = {0}; SSubmitBlk * pBlock = NULL; SSubmitBlkIter blkIter = {0}; - SMemRow row = NULL; + STSRow * row = NULL; TSKEY now = taosGetTimestamp(pTsdb->config.precision); TSKEY minKey = now - tsTickPerDay[pTsdb->config.precision] * pTsdb->config.keep; TSKEY maxKey = now + tsTickPerDay[pTsdb->config.precision] * pTsdb->config.daysPerFile; @@ -306,7 +306,7 @@ static int tsdbMemTableInsertTbData(STsdb *pTsdb, SSubmitBlk *pBlock, int32_t *p STsdbMemTable *pMemTable = pTsdb->mem; void * tptr; STbData * pTbData; - SMemRow row; + STSRow * row; TSKEY keyMin; TSKEY keyMax; @@ -332,12 +332,12 @@ static int tsdbMemTableInsertTbData(STsdb *pTsdb, SSubmitBlk *pBlock, int32_t *p tInitSubmitBlkIter(pBlock, &blkIter); if (blkIter.row == NULL) return 0; - keyMin = memRowKey(blkIter.row); + keyMin = TD_ROW_KEY(blkIter.row); tSkipListPutBatchByIter(pTbData->pData, &blkIter, (iter_next_fn_t)tGetSubmitBlkNext); // Set statistics - keyMax = memRowKey(blkIter.row); + keyMax = TD_ROW_KEY(blkIter.row); pTbData->nrows += pBlock->numOfRows; if (pTbData->keyMin > keyMin) pTbData->keyMin = keyMin; @@ -347,7 +347,7 @@ static int tsdbMemTableInsertTbData(STsdb *pTsdb, SSubmitBlk *pBlock, int32_t *p if (pMemTable->keyMin > keyMin) pMemTable->keyMin = keyMin; if (pMemTable->keyMax < keyMax) pMemTable->keyMax = keyMax; - // SMemRow lastRow = NULL; + // STSRow* lastRow = NULL; // int64_t osize = SL_SIZE(pTableData->pData); // tsdbSetupSkipListHookFns(pTableData->pData, pRepo, pTable, &points, &lastRow); // tSkipListPutBatchByIter(pTableData->pData, &blkIter, (iter_next_fn_t)tsdbGetSubmitBlkNext); @@ -355,7 +355,7 @@ static int tsdbMemTableInsertTbData(STsdb *pTsdb, SSubmitBlk *pBlock, int32_t *p // (*pAffectedRows) += points; // if(lastRow != NULL) { - // TSKEY lastRowKey = memRowKey(lastRow); + // TSKEY lastRowKey = TD_ROW_KEY(lastRow); // if (pMemTable->keyFirst > firstRowKey) pMemTable->keyFirst = firstRowKey; // pMemTable->numOfRows += dsize; @@ -418,7 +418,7 @@ static void tsdbFreeTbData(STbData *pTbData) { } } -static char *tsdbGetTsTupleKey(const void *data) { return memRowKeys((SMemRow)data); } +static char *tsdbGetTsTupleKey(const void *data) { return (char *)TD_ROW_KEY_ADDR((STSRow *)data); } static int tsdbTbDataComp(const void *arg1, const void *arg2) { STbData *pTbData1 = (STbData *)arg1; @@ -437,17 +437,17 @@ static char *tsdbTbDataGetUid(const void *arg) { STbData *pTbData = (STbData *)arg; return (char *)(&(pTbData->uid)); } -static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SMemRow row) { +static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow *row) { if (pCols) { - if (*ppSchema == NULL || schemaVersion(*ppSchema) != memRowVersion(row)) { - *ppSchema = tsdbGetTableSchemaImpl(pTable, false, false, memRowVersion(row)); + if (*ppSchema == NULL || schemaVersion(*ppSchema) != TD_ROW_SVER(row)) { + *ppSchema = tsdbGetTableSchemaImpl(pTable, false, false, TD_ROW_SVER(row)); if (*ppSchema == NULL) { ASSERT(false); return -1; } } - tdAppendMemRowToDataCol(row, *ppSchema, pCols, true); + tdAppendSTSRowToDataCol(row, *ppSchema, pCols, true); } return 0; @@ -479,7 +479,7 @@ int tsdbInsertDataToMemTable(STsdbMemTable *pMemTable, SSubmitMsg *pMsg) { typedef struct { int32_t totalLen; int32_t len; - SMemRow row; + STSRow* row; } SSubmitBlkIter; typedef struct { @@ -493,17 +493,17 @@ static void tsdbFreeMemTable(SMemTable *pMemTable); static STableData* tsdbNewTableData(STsdbCfg *pCfg, STable *pTable); static void tsdbFreeTableData(STableData *pTableData); static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables); -static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SMemRow row); +static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, STSRow* row); static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter); -static SMemRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter); +static STSRow* tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter); static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg); static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t *affectedrows); static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter); static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock); static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable); -static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SMemRow row); +static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, STSRow* row); -static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SMemRow row, TSKEY minKey, TSKEY maxKey, +static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, STSRow* row, TSKEY minKey, TSKEY maxKey, TSKEY now); @@ -685,7 +685,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey TSKEY fKey = 0; bool isRowDel = false; int filterIter = 0; - SMemRow row = NULL; + STSRow* row = NULL; SMergeInfo mInfo; if (pMergeInfo == NULL) pMergeInfo = &mInfo; @@ -696,11 +696,11 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey if (pCols) tdResetDataCols(pCols); row = tsdbNextIterRow(pIter); - if (row == NULL || memRowKey(row) > maxKey) { + if (row == NULL || TD_ROW_KEY(row) > maxKey) { rowKey = INT64_MAX; isRowDel = false; } else { - rowKey = memRowKey(row); + rowKey = TD_ROW_KEY(row); isRowDel = memRowDeleted(row); } @@ -738,11 +738,11 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey tSkipListIterNext(pIter); row = tsdbNextIterRow(pIter); - if (row == NULL || memRowKey(row) > maxKey) { + if (row == NULL || TD_ROW_KEY(row) > maxKey) { rowKey = INT64_MAX; isRowDel = false; } else { - rowKey = memRowKey(row); + rowKey = TD_ROW_KEY(row); isRowDel = memRowDeleted(row); } } else { @@ -768,11 +768,11 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey tSkipListIterNext(pIter); row = tsdbNextIterRow(pIter); - if (row == NULL || memRowKey(row) > maxKey) { + if (row == NULL || TD_ROW_KEY(row) > maxKey) { rowKey = INT64_MAX; isRowDel = false; } else { - rowKey = memRowKey(row); + rowKey = TD_ROW_KEY(row); isRowDel = memRowDeleted(row); } @@ -790,9 +790,9 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey // ---------------- LOCAL FUNCTIONS ---------------- -static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SMemRow row, TSKEY minKey, TSKEY maxKey, +static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, STSRow* row, TSKEY minKey, TSKEY maxKey, TSKEY now) { - TSKEY rowKey = memRowKey(row); + TSKEY rowKey = TD_ROW_KEY(row); if (rowKey < minKey || rowKey > maxKey) { tsdbError("vgId:%d table %s tid %d uid %" PRIu64 " timestamp is out of range! now %" PRId64 " minKey %" PRId64 " maxKey %" PRId64 " row key %" PRId64, @@ -807,9 +807,9 @@ static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SMem //row1 has higher priority -static SMemRow tsdbInsertDupKeyMerge(SMemRow row1, SMemRow row2, STsdbRepo* pRepo, +static STSRow* tsdbInsertDupKeyMerge(STSRow* row1, STSRow* row2, STsdbRepo* pRepo, STSchema **ppSchema1, STSchema **ppSchema2, - STable* pTable, int32_t* pPoints, SMemRow* pLastRow) { + STable* pTable, int32_t* pPoints, STSRow** pLastRow) { //for compatiblity, duplicate key inserted when update=0 should be also calculated as affected rows! if(row1 == NULL && row2 == NULL && pRepo->config.update == TD_ROW_DISCARD_UPDATE) { @@ -819,10 +819,10 @@ static SMemRow tsdbInsertDupKeyMerge(SMemRow row1, SMemRow row2, STsdbRepo* pRep tsdbTrace("vgId:%d a row is %s table %s tid %d uid %" PRIu64 " key %" PRIu64, REPO_ID(pRepo), "updated in", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), - memRowKey(row1)); + TD_ROW_KEY(row1)); if(row2 == NULL || pRepo->config.update != TD_ROW_PARTIAL_UPDATE) { - void* pMem = tsdbAllocBytes(pRepo, memRowTLen(row1)); + void* pMem = tsdbAllocBytes(pRepo, TD_ROW_LEN(row1)); if(pMem == NULL) return NULL; memRowCpy(pMem, row1); (*pPoints)++; @@ -853,9 +853,9 @@ static SMemRow tsdbInsertDupKeyMerge(SMemRow row1, SMemRow row2, STsdbRepo* pRep } } - SMemRow tmp = tsdbMergeTwoRows(pBuf, row1, row2, pSchema1, pSchema2); + STSRow* tmp = tsdbMergeTwoRows(pBuf, row1, row2, pSchema1, pSchema2); - void* pMem = tsdbAllocBytes(pRepo, memRowTLen(tmp)); + void* pMem = tsdbAllocBytes(pRepo, TD_ROW_LEN(tmp)); if(pMem == NULL) return NULL; memRowCpy(pMem, tmp); @@ -868,7 +868,7 @@ static void* tsdbInsertDupKeyMergePacked(void** args) { return tsdbInsertDupKeyMerge(args[0], args[1], args[2], (STSchema**)&args[3], (STSchema**)&args[4], args[5], args[6], args[7]); } -static void tsdbSetupSkipListHookFns(SSkipList* pSkipList, STsdbRepo *pRepo, STable *pTable, int32_t* pPoints, SMemRow* pLastRow) { +static void tsdbSetupSkipListHookFns(SSkipList* pSkipList, STsdbRepo *pRepo, STable *pTable, int32_t* pPoints, STSRow** pLastRow) { if(pSkipList->insertHandleFn == NULL) { tGenericSavedFunc *dupHandleSavedFunc = genericSavedFuncInit((GenericVaFunc)&tsdbInsertDupKeyMergePacked, 9); @@ -953,7 +953,7 @@ static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pT return 0; } -static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow row) { +static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, STSRow* row) { tsdbDebug("vgId:%d updateTableLatestColumn, %s row version:%d", REPO_ID(pRepo), pTable->name->data, memRowVersion(row)); @@ -1002,30 +1002,30 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow ro assert(pTCol->bytes >= bytes); memcpy(pDataCol->pData, value, bytes); //tsdbInfo("updateTableLatestColumn vgId:%d cache column %d for %d,%s", REPO_ID(pRepo), j, pDataCol->bytes, (char*)pDataCol->pData); - pDataCol->ts = memRowKey(row); + pDataCol->ts = TD_ROW_KEY(row); // unlock TSDB_WUNLOCK_TABLE(pTable); } } -static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SMemRow row) { +static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, STSRow* row) { STsdbCfg *pCfg = &pRepo->config; // if cacheLastRow config has been reset, free the lastRow if (!pCfg->cacheLastRow && pTable->lastRow != NULL) { - SMemRow cachedLastRow = pTable->lastRow; + STSRow* cachedLastRow = pTable->lastRow; TSDB_WLOCK_TABLE(pTable); pTable->lastRow = NULL; TSDB_WUNLOCK_TABLE(pTable); taosTZfree(cachedLastRow); } - if (tsdbGetTableLastKeyImpl(pTable) <= memRowKey(row)) { + if (tsdbGetTableLastKeyImpl(pTable) <= TD_ROW_KEY(row)) { if (CACHE_LAST_ROW(pCfg) || pTable->lastRow != NULL) { - SMemRow nrow = pTable->lastRow; - if (taosTSizeof(nrow) < memRowTLen(row)) { - SMemRow orow = nrow; - nrow = taosTMalloc(memRowTLen(row)); + STSRow* nrow = pTable->lastRow; + if (taosTSizeof(nrow) < TD_ROW_LEN(row)) { + STSRow* orow = nrow; + nrow = taosTMalloc(TD_ROW_LEN(row)); if (nrow == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return -1; @@ -1033,18 +1033,18 @@ static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SMemRow r memRowCpy(nrow, row); TSDB_WLOCK_TABLE(pTable); - pTable->lastKey = memRowKey(row); + pTable->lastKey = TD_ROW_KEY(row); pTable->lastRow = nrow; TSDB_WUNLOCK_TABLE(pTable); taosTZfree(orow); } else { TSDB_WLOCK_TABLE(pTable); - pTable->lastKey = memRowKey(row); + pTable->lastKey = TD_ROW_KEY(row); memRowCpy(nrow, row); TSDB_WUNLOCK_TABLE(pTable); } } else { - pTable->lastKey = memRowKey(row); + pTable->lastKey = TD_ROW_KEY(row); } if (CACHE_LAST_NULL_COLUMN(pCfg)) { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 3b4057a6e4..e76737a664 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -157,7 +157,7 @@ typedef struct STableGroupSupporter { static STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList); static int32_t checkForCachedLastRow(STsdbReadHandle* pTsdbReadHandle, STableGroupInfo *groupList); static int32_t checkForCachedLast(STsdbReadHandle* pTsdbReadHandle); -//static int32_t tsdbGetCachedLastRow(STable* pTable, SMemRow* pRes, TSKEY* lastKey); +// static int32_t tsdbGetCachedLastRow(STable* pTable, STSRow** pRes, TSKEY* lastKey); static void changeQueryHandleForInterpQuery(tsdbReaderT pHandle); static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock); @@ -689,8 +689,8 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); assert(node != NULL); - SMemRow row = (SMemRow)SL_GET_NODE_DATA(node); - TSKEY key = memRowKey(row); // first timestamp in buffer + STSRow* row = (STSRow*)SL_GET_NODE_DATA(node); + TSKEY key = TD_ROW_KEY(row); // first timestamp in buffer tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64 "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%"PRId64", %s", pHandle, pCheckInfo->tableId, key, order, (*pMem)->keyMin, (*pMem)->keyMax, pCheckInfo->lastKey, (*pMem)->nrows, pHandle->idStr); @@ -709,8 +709,8 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter); assert(node != NULL); - SMemRow row = (SMemRow)SL_GET_NODE_DATA(node); - TSKEY key = memRowKey(row); // first timestamp in buffer + STSRow* row = (STSRow*)SL_GET_NODE_DATA(node); + TSKEY key = TD_ROW_KEY(row); // first timestamp in buffer tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64 "-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%"PRId64", %s", pHandle, pCheckInfo->tableId, key, order, (*pIMem)->keyMin, (*pIMem)->keyMax, pCheckInfo->lastKey, (*pIMem)->nrows, pHandle->idStr); @@ -733,18 +733,18 @@ static void destroyTableMemIterator(STableCheckInfo* pCheckInfo) { } static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order, int32_t update) { - SMemRow rmem = NULL, rimem = NULL; + STSRow *rmem = NULL, *rimem = NULL; if (pCheckInfo->iter) { SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); if (node != NULL) { - rmem = (SMemRow)SL_GET_NODE_DATA(node); + rmem = (STSRow*)SL_GET_NODE_DATA(node); } } if (pCheckInfo->iiter) { SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter); if (node != NULL) { - rimem = (SMemRow)SL_GET_NODE_DATA(node); + rimem = (STSRow*)SL_GET_NODE_DATA(node); } } @@ -754,16 +754,16 @@ static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order, if (rmem != NULL && rimem == NULL) { pCheckInfo->chosen = CHECKINFO_CHOSEN_MEM; - return memRowKey(rmem); + return TD_ROW_KEY(rmem); } if (rmem == NULL && rimem != NULL) { pCheckInfo->chosen = CHECKINFO_CHOSEN_IMEM; - return memRowKey(rimem); + return TD_ROW_KEY(rimem); } - TSKEY r1 = memRowKey(rmem); - TSKEY r2 = memRowKey(rimem); + TSKEY r1 = TD_ROW_KEY(rmem); + TSKEY r2 = TD_ROW_KEY(rimem); if (r1 == r2) { if(update == TD_ROW_DISCARD_UPDATE){ @@ -787,19 +787,19 @@ static TSKEY extractFirstTraverseKey(STableCheckInfo* pCheckInfo, int32_t order, } } -static SMemRow getSMemRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int32_t update, SMemRow* extraRow) { - SMemRow rmem = NULL, rimem = NULL; +static STSRow* getSRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, int32_t update, STSRow** extraRow) { + STSRow *rmem = NULL, *rimem = NULL; if (pCheckInfo->iter) { SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter); if (node != NULL) { - rmem = (SMemRow)SL_GET_NODE_DATA(node); + rmem = (STSRow*)SL_GET_NODE_DATA(node); } } if (pCheckInfo->iiter) { SSkipListNode* node = tSkipListIterGet(pCheckInfo->iiter); if (node != NULL) { - rimem = (SMemRow)SL_GET_NODE_DATA(node); + rimem = (STSRow*)SL_GET_NODE_DATA(node); } } @@ -817,8 +817,8 @@ static SMemRow getSMemRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, return rimem; } - TSKEY r1 = memRowKey(rmem); - TSKEY r2 = memRowKey(rimem); + TSKEY r1 = TD_ROW_KEY(rmem); + TSKEY r2 = TD_ROW_KEY(rimem); if (r1 == r2) { if (update == TD_ROW_DISCARD_UPDATE) { @@ -831,7 +831,7 @@ static SMemRow getSMemRowInTableMem(STableCheckInfo* pCheckInfo, int32_t order, return rmem; } else { pCheckInfo->chosen = CHECKINFO_CHOSEN_BOTH; - extraRow = rimem; + *extraRow = rimem; return rmem; } } else { @@ -904,12 +904,12 @@ static bool hasMoreDataInCache(STsdbReadHandle* pHandle) { initTableMemIterator(pHandle, pCheckInfo); } - SMemRow row = getSMemRowInTableMem(pCheckInfo, pHandle->order, pCfg->update, NULL); + STSRow* row = getSRowInTableMem(pCheckInfo, pHandle->order, pCfg->update, NULL); if (row == NULL) { return false; } - pCheckInfo->lastKey = memRowKey(row); // first timestamp in buffer + pCheckInfo->lastKey = TD_ROW_KEY(row); // first timestamp in buffer tsdbDebug("%p uid:%" PRId64", check data in buffer from skey:%" PRId64 ", order:%d, %s", pHandle, pCheckInfo->tableId, pCheckInfo->lastKey, pHandle->order, pHandle->idStr); @@ -1418,8 +1418,11 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t // todo refactor, only copy one-by-one for (int32_t k = start; k < num + start; ++k) { - const char* p = tdGetColDataOfRow(src, k); - memcpy(dst, p, varDataTLen(p)); + SCellVal sVal = {0}; + if(tdGetColDataOfRow(&sVal, src, k) < 0){ + TASSERT(0); + } + memcpy(dst, sVal.val, varDataTLen(sVal.val)); dst += bytes; } } @@ -1470,16 +1473,17 @@ static int32_t doCopyRowsFromFileBlock(STsdbReadHandle* pTsdbReadHandle, int32_t } // Note: row1 always has high priority -static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows, - SMemRow row1, SMemRow row2, int32_t numOfCols, uint64_t uid, - STSchema* pSchema1, STSchema* pSchema2, bool forceSetNull) { +static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacity, int32_t numOfRows, STSRow* row1, + STSRow* row2, int32_t numOfCols, uint64_t uid, STSchema* pSchema1, STSchema* pSchema2, + bool forceSetNull) { +#if 0 char* pData = NULL; STSchema* pSchema; - SMemRow row; + STSRow* row; int16_t colId; int16_t offset; - bool isRow1DataRow = isDataRow(row1); + bool isRow1DataRow = TD_IS_TP_ROW(row1); bool isRow2DataRow; bool isChosenRowDataRow; int32_t chosen_itr; @@ -1495,19 +1499,19 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit if(isRow1DataRow) { numOfColsOfRow1 = schemaNCols(pSchema1); } else { - numOfColsOfRow1 = kvRowNCols(memRowKvBody(row1)); + numOfColsOfRow1 = TD_ROW_NCOLS(row1); } int32_t numOfColsOfRow2 = 0; if(row2) { - isRow2DataRow = isDataRow(row2); + isRow2DataRow = TD_IS_TP_ROW(row2); if (pSchema2 == NULL) { pSchema2 = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, uid, 0); } if(isRow2DataRow) { numOfColsOfRow2 = schemaNCols(pSchema2); } else { - numOfColsOfRow2 = kvRowNCols(memRowKvBody(row2)); + numOfColsOfRow2 = TD_ROW_NCOLS(row2); } } @@ -1669,6 +1673,7 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit i++; } } +#endif } static void moveDataToFront(STsdbReadHandle* pTsdbReadHandle, int32_t numOfRows, int32_t numOfCols) { @@ -1851,13 +1856,13 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf } else if (pCheckInfo->iter != NULL || pCheckInfo->iiter != NULL) { SSkipListNode* node = NULL; do { - SMemRow row2 = NULL; - SMemRow row1 = getSMemRowInTableMem(pCheckInfo, pTsdbReadHandle->order, pCfg->update, &row2); + STSRow* row2 = NULL; + STSRow* row1 = getSRowInTableMem(pCheckInfo, pTsdbReadHandle->order, pCfg->update, &row2); if (row1 == NULL) { break; } - TSKEY key = memRowKey(row1); + TSKEY key = TD_ROW_KEY(row1); if ((key > pTsdbReadHandle->window.ekey && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) || (key < pTsdbReadHandle->window.ekey && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) { break; @@ -1870,13 +1875,13 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) || (key > tsArray[pos] && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) { - if (rv1 != memRowVersion(row1)) { + if (rv1 != TD_ROW_SVER(row1)) { // pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1)); - rv1 = memRowVersion(row1); + rv1 = TD_ROW_SVER(row1); } - if(row2 && rv2 != memRowVersion(row2)) { + if(row2 && rv2 != TD_ROW_SVER(row2)) { // pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2)); - rv2 = memRowVersion(row2); + rv2 = TD_ROW_SVER(row2); } mergeTwoRowFromMem(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, row1, row2, numOfCols, pCheckInfo->tableId, pSchema1, pSchema2, true); @@ -1895,13 +1900,13 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf if(pCfg->update == TD_ROW_PARTIAL_UPDATE) { doCopyRowsFromFileBlock(pTsdbReadHandle, pTsdbReadHandle->outputCapacity, numOfRows, pos, pos); } - if (rv1 != memRowVersion(row1)) { + if (rv1 != TD_ROW_SVER(row1)) { // pSchema1 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row1)); - rv1 = memRowVersion(row1); + rv1 = TD_ROW_SVER(row1); } - if(row2 && rv2 != memRowVersion(row2)) { + if(row2 && rv2 != TD_ROW_SVER(row2)) { // pSchema2 = tsdbGetTableSchemaByVersion(pTable, memRowVersion(row2)); - rv2 = memRowVersion(row2); + rv2 = TD_ROW_SVER(row2); } bool forceSetNull = pCfg->update != TD_ROW_PARTIAL_UPDATE; @@ -1954,9 +1959,9 @@ static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInf * copy them all to result buffer, since it may be overlapped with file data block. */ if (node == NULL || - ((memRowKey((SMemRow)SL_GET_NODE_DATA(node)) > pTsdbReadHandle->window.ekey) && + ((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) > pTsdbReadHandle->window.ekey) && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) || - ((memRowKey((SMemRow)SL_GET_NODE_DATA(node)) < pTsdbReadHandle->window.ekey) && + ((TD_ROW_KEY((STSRow*)SL_GET_NODE_DATA(node)) < pTsdbReadHandle->window.ekey) && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) { // no data in cache or data in cache is greater than the ekey of time window, load data from file block if (cur->win.skey == TSKEY_INITIAL_VAL) { @@ -2541,12 +2546,12 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int STSchema* pSchema = NULL; do { - SMemRow row = getSMemRowInTableMem(pCheckInfo, pTsdbReadHandle->order, pCfg->update, NULL); + STSRow* row = getSRowInTableMem(pCheckInfo, pTsdbReadHandle->order, pCfg->update, NULL); if (row == NULL) { break; } - TSKEY key = memRowKey(row); + TSKEY key = TD_ROW_KEY(row); if ((key > maxKey && ASCENDING_TRAVERSE(pTsdbReadHandle->order)) || (key < maxKey && !ASCENDING_TRAVERSE(pTsdbReadHandle->order))) { tsdbDebug("%p key:%"PRIu64" beyond qrange:%"PRId64" - %"PRId64", no more data in buffer", pTsdbReadHandle, key, pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey); @@ -2559,9 +2564,9 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int } win->ekey = key; - if (rv != memRowVersion(row)) { + if (rv != TD_ROW_SVER(row)) { pSchema = metaGetTbTSchema(pTsdbReadHandle->pTsdb->pMeta, pCheckInfo->tableId, 0); - rv = memRowVersion(row); + rv = TD_ROW_SVER(row); } mergeTwoRowFromMem(pTsdbReadHandle, maxRowsToRead, numOfRows, row, NULL, numOfCols, pCheckInfo->tableId, pSchema, NULL, true); @@ -2684,7 +2689,7 @@ static bool loadCachedLastRow(STsdbReadHandle* pTsdbReadHandle) { SQueryFilePos* cur = &pTsdbReadHandle->cur; - SMemRow pRow = NULL; + STSRow* pRow = NULL; TSKEY key = TSKEY_INITIAL_VAL; int32_t step = ASCENDING_TRAVERSE(pTsdbReadHandle->order)? 1:-1; @@ -3093,7 +3098,7 @@ bool tsdbGetExternalRow(tsdbReaderT pHandle) { * if lastRow == NULL, return TSDB_CODE_TDB_NO_CACHE_LAST_ROW * else set pRes and return TSDB_CODE_SUCCESS and save lastKey */ -//int32_t tsdbGetCachedLastRow(STable* pTable, SMemRow* pRes, TSKEY* lastKey) { +// int32_t tsdbGetCachedLastRow(STable* pTable, STSRow** pRes, TSKEY* lastKey) { // int32_t code = TSDB_CODE_SUCCESS; // // TSDB_RLOCK_TABLE(pTable); @@ -3110,7 +3115,7 @@ bool tsdbGetExternalRow(tsdbReaderT pHandle) { // } // } // -//out: +// out: // TSDB_RUNLOCK_TABLE(pTable); // return code; //} diff --git a/source/dnode/vnode/src/tsdb/tsdbReadImpl.c b/source/dnode/vnode/src/tsdb/tsdbReadImpl.c index 24c71fdc7e..c9bc540a18 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadImpl.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadImpl.c @@ -21,7 +21,8 @@ static void tsdbResetReadTable(SReadH *pReadh); static void tsdbResetReadFile(SReadH *pReadh); static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols); static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32_t len, int8_t comp, int numOfRows, - int maxPoints, char *buffer, int bufferSize); + int numOfBitmaps, int lenOfBitmaps, int maxPoints, char *buffer, + int bufferSize); static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDataCols, int16_t *colIds, int numOfColIds); static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBlockCol *pBlockCol, SDataCol *pDataCol); @@ -463,6 +464,8 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat // Recover the data int ccol = 0; // loop iter for SBlockCol object int dcol = 0; // loop iter for SDataCols object + int nBitmaps = (int)TD_BITMAP_BYTES(pBlock->numOfRows); + SBlockCol *pBlockCol = NULL; while (dcol < pDataCols->numOfCols) { SDataCol *pDataCol = &(pDataCols->cols[dcol]); if (dcol != 0 && ccol >= pBlockData->numOfCols) { @@ -477,12 +480,26 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat int32_t tlen = pBlock->keyLen; if (dcol != 0) { - SBlockCol *pBlockCol = &(pBlockData->cols[ccol]); + pBlockCol = &(pBlockData->cols[ccol]); tcolId = pBlockCol->colId; toffset = tsdbGetBlockColOffset(pBlockCol); tlen = pBlockCol->len; + pDataCol->bitmap = pBlockCol->bitmap; } else { ASSERT(pDataCol->colId == tcolId); + pDataCol->bitmap = 1; + } + + int32_t tBitmaps = 0; + int32_t tLenBitmap = 0; + if ((dcol != 0) && (pBlockCol->bitmap == 0)) { + if (IS_VAR_DATA_TYPE(pDataCol->type)) { + tBitmaps = nBitmaps; + tLenBitmap = tBitmaps; + } else { + tBitmaps = (int32_t)ceil((double)nBitmaps / TYPE_BYTES[pDataCol->type]); + tLenBitmap = tBitmaps * TYPE_BYTES[pDataCol->type]; + } } if (tcolId == pDataCol->colId) { @@ -492,7 +509,7 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat } if (tsdbCheckAndDecodeColumnData(pDataCol, POINTER_SHIFT(pBlockData, tsize + toffset), tlen, pBlock->algorithm, - pBlock->numOfRows, pDataCols->maxPoints, TSDB_READ_COMP_BUF(pReadh), + pBlock->numOfRows, tBitmaps, tLenBitmap, pDataCols->maxPoints, TSDB_READ_COMP_BUF(pReadh), (int)taosTSizeof(TSDB_READ_COMP_BUF(pReadh))) < 0) { tsdbError("vgId:%d file %s is broken at column %d block offset %" PRId64 " column offset %u", TSDB_READ_REPO_ID(pReadh), TSDB_FILE_FULL_NAME(pDFile), tcolId, (int64_t)pBlock->offset, toffset); @@ -516,7 +533,8 @@ static int tsdbLoadBlockDataImpl(SReadH *pReadh, SBlock *pBlock, SDataCols *pDat } static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32_t len, int8_t comp, int numOfRows, - int maxPoints, char *buffer, int bufferSize) { + int numOfBitmaps, int lenOfBitmaps, int maxPoints, char *buffer, + int bufferSize) { if (!taosCheckChecksumWhole((uint8_t *)content, len)) { terrno = TSDB_CODE_TDB_FILE_CORRUPTED; return -1; @@ -527,8 +545,9 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32 // Decode the data if (comp) { // Need to decompress - int tlen = (*(tDataTypes[pDataCol->type].decompFunc))(content, len - sizeof(TSCKSUM), numOfRows, pDataCol->pData, - pDataCol->spaceSize, comp, buffer, bufferSize); + int tlen = + (*(tDataTypes[pDataCol->type].decompFunc))(content, len - sizeof(TSCKSUM), numOfRows + numOfBitmaps, + pDataCol->pData, pDataCol->spaceSize, comp, buffer, bufferSize); if (tlen <= 0) { tsdbError("Failed to decompress column, file corrupted, len:%d comp:%d numOfRows:%d maxPoints:%d bufferSize:%d", len, comp, numOfRows, maxPoints, bufferSize); @@ -542,9 +561,22 @@ static int tsdbCheckAndDecodeColumnData(SDataCol *pDataCol, void *content, int32 memcpy(pDataCol->pData, content, pDataCol->len); } - if (IS_VAR_DATA_TYPE(pDataCol->type)) { + if (lenOfBitmaps > 0) { + pDataCol->len -= lenOfBitmaps; + + void *pSrcBitmap = NULL; + if (IS_VAR_DATA_TYPE(pDataCol->type)) { + pSrcBitmap = dataColSetOffset(pDataCol, numOfRows); + } else { + pSrcBitmap = POINTER_SHIFT(pDataCol->pData, numOfRows * TYPE_BYTES[pDataCol->type]); + } + void *pDestBitmap = POINTER_SHIFT(pDataCol->pData, pDataCol->bytes * maxPoints); + // restore the bitmap parts + memcpy(pDestBitmap, pSrcBitmap, lenOfBitmaps); + } else if (IS_VAR_DATA_TYPE(pDataCol->type)) { dataColSetOffset(pDataCol, numOfRows); } + return 0; } @@ -590,6 +622,7 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols * if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) { // load the key row blockCol.colId = colId; + blockCol.bitmap = 0; // default is NORM for the primary key column blockCol.len = pBlock->keyLen; blockCol.type = pDataCol->type; blockCol.offset = TSDB_KEY_COL_OFFSET; @@ -617,6 +650,8 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols * } ASSERT(pBlockCol->colId == pDataCol->colId); + // set the bitmap + pDataCol->bitmap = pBlockCol->bitmap; } if (tsdbLoadColData(pReadh, pDFile, pBlock, pBlockCol, pDataCol) < 0) return -1; @@ -630,7 +665,22 @@ static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBloc STsdb * pRepo = TSDB_READ_REPO(pReadh); STsdbCfg *pCfg = REPO_CFG(pRepo); - int tsize = pDataCol->bytes * pBlock->numOfRows + COMP_OVERFLOW_BYTES; + + int nBitmaps = (int)TD_BITMAP_BYTES(pBlock->numOfRows); + int32_t tBitmaps = 0; + int32_t tLenBitmap = 0; + + if (pBlockCol->bitmap == 0) { + if (IS_VAR_DATA_TYPE(pDataCol->type)) { + tBitmaps = nBitmaps; + tLenBitmap = tBitmaps; + } else { + tBitmaps = (int32_t)ceil((double)nBitmaps / TYPE_BYTES[pDataCol->type]); + tLenBitmap = tBitmaps * TYPE_BYTES[pDataCol->type]; + } + } + + int tsize = pDataCol->bytes * pBlock->numOfRows + tLenBitmap + COMP_OVERFLOW_BYTES; if (tsdbMakeRoom((void **)(&TSDB_READ_BUF(pReadh)), pBlockCol->len) < 0) return -1; if (tsdbMakeRoom((void **)(&TSDB_READ_COMP_BUF(pReadh)), tsize) < 0) return -1; @@ -658,7 +708,8 @@ static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBloc } if (tsdbCheckAndDecodeColumnData(pDataCol, pReadh->pBuf, pBlockCol->len, pBlock->algorithm, pBlock->numOfRows, - pCfg->maxRowsPerFileBlock, pReadh->pCBuf, (int32_t)taosTSizeof(pReadh->pCBuf)) < 0) { + tBitmaps, tLenBitmap, pCfg->maxRowsPerFileBlock, pReadh->pCBuf, + (int32_t)taosTSizeof(pReadh->pCBuf)) < 0) { tsdbError("vgId:%d file %s is broken at column %d offset %" PRId64, REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile), pBlockCol->colId, offset); return -1; diff --git a/source/libs/parser/inc/dataBlockMgt.h b/source/libs/parser/inc/dataBlockMgt.h index 637781922b..f93daa7f93 100644 --- a/source/libs/parser/inc/dataBlockMgt.h +++ b/source/libs/parser/inc/dataBlockMgt.h @@ -80,44 +80,37 @@ typedef struct STableDataBlocks { STagData tagData; SParsedDataColInfo boundColumnInfo; - SMemRowBuilder rowBuilder; + SRowBuilder rowBuilder; } STableDataBlocks; -static FORCE_INLINE void initSMemRow(SMemRow row, uint8_t memRowType, STableDataBlocks *pBlock, int16_t nBoundCols) { - memRowSetType(row, memRowType); - if (isDataRowT(memRowType)) { - dataRowSetVersion(memRowDataBody(row), pBlock->pTableMeta->sversion); - dataRowSetLen(memRowDataBody(row), (TDRowLenT)(TD_DATA_ROW_HEAD_SIZE + pBlock->boundColumnInfo.flen)); - } else { - ASSERT(nBoundCols > 0); - memRowSetKvVersion(row, pBlock->pTableMeta->sversion); - kvRowSetNCols(memRowKvBody(row), nBoundCols); - kvRowSetLen(memRowKvBody(row), (TDRowLenT)(TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * nBoundCols)); - } -} - static FORCE_INLINE int32_t getExtendedRowSize(STableDataBlocks *pBlock) { - ASSERT(pBlock->rowSize == pBlock->pTableMeta->tableInfo.rowSize); - return pBlock->rowSize + TD_MEM_ROW_DATA_HEAD_SIZE + pBlock->boundColumnInfo.extendedVarLen; + STableComInfo *pTableInfo = &pBlock->pTableMeta->tableInfo; + ASSERT(pBlock->rowSize == pTableInfo->rowSize); + return pBlock->rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + pBlock->boundColumnInfo.extendedVarLen + + (int32_t)TD_BITMAP_BYTES(pTableInfo->numOfColumns - 1); } -static FORCE_INLINE void getMemRowAppendInfo(SSchema *pSchema, uint8_t memRowType, SParsedDataColInfo *spd, - int32_t idx, int32_t *toffset) { +static FORCE_INLINE void getMemRowAppendInfo(SSchema *pSchema, uint8_t rowType, SParsedDataColInfo *spd, + int32_t idx, int32_t *toffset, int32_t *colIdx) { int32_t schemaIdx = 0; if (IS_DATA_COL_ORDERED(spd)) { schemaIdx = spd->boundedColumns[idx] - 1; - if (isDataRowT(memRowType)) { + if (TD_IS_TP_ROW_T(rowType)) { *toffset = (spd->cols + schemaIdx)->toffset; // the offset of firstPart + *colIdx = schemaIdx; } else { *toffset = idx * sizeof(SColIdx); // the offset of SColIdx + *colIdx = idx; } } else { ASSERT(idx == (spd->colIdxInfo + idx)->boundIdx); schemaIdx = (spd->colIdxInfo + idx)->schemaColIdx; - if (isDataRowT(memRowType)) { + if (TD_IS_TP_ROW_T(rowType)) { *toffset = (spd->cols + schemaIdx)->toffset; + *colIdx = schemaIdx; } else { *toffset = ((spd->colIdxInfo + idx)->finalIdx) * sizeof(SColIdx); + *colIdx = (spd->colIdxInfo + idx)->finalIdx; } } } @@ -141,7 +134,7 @@ void setBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, int32_t void destroyBoundColumnInfo(SParsedDataColInfo* pColList); void destroyBlockArrayList(SArray* pDataBlockList); void destroyBlockHashmap(SHashObj* pDataBlockHash); -int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, SParsedDataColInfo *pColInfo); +int initRowBuilder(SRowBuilder *pBuilder, int16_t schemaVer, SParsedDataColInfo *pColInfo); int32_t allocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows); int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, const STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList); diff --git a/source/libs/parser/src/dataBlockMgt.c b/source/libs/parser/src/dataBlockMgt.c index 117abdf380..8412580d71 100644 --- a/source/libs/parser/src/dataBlockMgt.c +++ b/source/libs/parser/src/dataBlockMgt.c @@ -174,81 +174,18 @@ int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int3 } static int32_t getRowExpandSize(STableMeta* pTableMeta) { - int32_t result = TD_MEM_ROW_DATA_HEAD_SIZE; + int32_t result = TD_ROW_HEAD_LEN - sizeof(TSKEY); int32_t columns = getNumOfColumns(pTableMeta); SSchema* pSchema = getTableColumnSchema(pTableMeta); - for (int32_t i = 0; i < columns; i++) { + for (int32_t i = 0; i < columns; ++i) { if (IS_VAR_DATA_TYPE((pSchema + i)->type)) { result += TYPE_BYTES[TSDB_DATA_TYPE_BINARY]; } } + result += (int32_t)TD_BITMAP_BYTES(columns - 1); return result; } -/** - * TODO: Move to tdataformat.h and refactor when STSchema available. - * - fetch flen and toffset from STSChema and remove param spd - */ -static FORCE_INLINE void convertToSDataRow(SMemRow dest, SMemRow src, SSchema *pSchema, int nCols, SParsedDataColInfo *spd) { - ASSERT(isKvRow(src)); - SKVRow kvRow = memRowKvBody(src); - SDataRow dataRow = memRowDataBody(dest); - - memRowSetType(dest, SMEM_ROW_DATA); - dataRowSetVersion(dataRow, memRowKvVersion(src)); - dataRowSetLen(dataRow, (TDRowLenT)(TD_DATA_ROW_HEAD_SIZE + spd->flen)); - - int32_t kvIdx = 0; - for (int i = 0; i < nCols; ++i) { - SSchema *schema = pSchema + i; - void * val = tdGetKVRowValOfColEx(kvRow, schema->colId, &kvIdx); - tdAppendDataColVal(dataRow, val != NULL ? val : getNullValue(schema->type), true, schema->type, - (spd->cols + i)->toffset); - } -} - -// TODO: Move to tdataformat.h and refactor when STSchema available. -static FORCE_INLINE void convertToSKVRow(SMemRow dest, SMemRow src, SSchema *pSchema, int nCols, int nBoundCols, SParsedDataColInfo *spd) { - ASSERT(isDataRow(src)); - - SDataRow dataRow = memRowDataBody(src); - SKVRow kvRow = memRowKvBody(dest); - - memRowSetType(dest, SMEM_ROW_KV); - memRowSetKvVersion(kvRow, dataRowVersion(dataRow)); - kvRowSetNCols(kvRow, nBoundCols); - kvRowSetLen(kvRow, (TDRowLenT)(TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * nBoundCols)); - - int32_t toffset = 0, kvOffset = 0; - for (int i = 0; i < nCols; ++i) { - if ((spd->cols + i)->valStat == VAL_STAT_HAS) { - SSchema *schema = pSchema + i; - toffset = (spd->cols + i)->toffset; - void *val = tdGetRowDataOfCol(dataRow, schema->type, toffset + TD_DATA_ROW_HEAD_SIZE); - tdAppendKvColVal(kvRow, val, true, schema->colId, schema->type, kvOffset); - kvOffset += sizeof(SColIdx); - } - } -} - -// TODO: Move to tdataformat.h and refactor when STSchema available. -static FORCE_INLINE void convertSMemRow(SMemRow dest, SMemRow src, STableDataBlocks *pBlock) { - STableMeta * pTableMeta = pBlock->pTableMeta; - STableComInfo tinfo = getTableInfo(pTableMeta); - SSchema * pSchema = getTableColumnSchema(pTableMeta); - SParsedDataColInfo *spd = &pBlock->boundColumnInfo; - - ASSERT(dest != src); - - if (isDataRow(src)) { - // TODO: Can we use pBlock -> numOfParam directly? - ASSERT(spd->numOfBound > 0); - convertToSKVRow(dest, src, pSchema, tinfo.numOfColumns, spd->numOfBound, spd); - } else { - convertToSDataRow(dest, src, pSchema, tinfo.numOfColumns, spd); - } -} - static void destroyDataBlock(STableDataBlocks* pDataBlock) { if (pDataBlock == NULL) { return; @@ -361,7 +298,7 @@ int sortRemoveDataBlockDupRows(STableDataBlocks *dataBuf, SBlockKeyInfo *pBlkKey char * pBlockData = pBlocks->data; int n = 0; while (n < nRows) { - pBlkKeyTuple->skey = memRowKey(pBlockData); + pBlkKeyTuple->skey = TD_ROW_KEY((STSRow *)pBlockData); pBlkKeyTuple->payloadAddr = pBlockData; // next loop @@ -446,27 +383,29 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SB int32_t numOfRows = pBlock->numOfRows; if (isRawPayload) { - for (int32_t i = 0; i < numOfRows; ++i) { - SMemRow memRow = (SMemRow)pDataBlock; - memRowSetType(memRow, SMEM_ROW_DATA); - SDataRow trow = memRowDataBody(memRow); - dataRowSetLen(trow, (uint16_t)(TD_DATA_ROW_HEAD_SIZE + flen)); - dataRowSetVersion(trow, pTableMeta->sversion); + SRowBuilder builder = {0}; + + tdSRowInit(&builder, pTableMeta->sversion); + tdSRowSetInfo(&builder, getNumOfColumns(pTableMeta), -1, flen); + for (int32_t i = 0; i < numOfRows; ++i) { + tdSRowResetBuf(&builder, pDataBlock); int toffset = 0; - for (int32_t j = 0; j < tinfo.numOfColumns; j++) { - tdAppendColVal(trow, p, pSchema[j].type, toffset); - toffset += TYPE_BYTES[pSchema[j].type]; + for (int32_t j = 0; j < tinfo.numOfColumns; ++j) { + int8_t colType = pSchema[j].type; + uint8_t valType = isNull(p, colType) ? TD_VTYPE_NULL : TD_VTYPE_NORM; + tdAppendColValToRow(&builder, pSchema[j].colId, colType, valType, p, true, toffset, j); + toffset += TYPE_BYTES[colType]; p += pSchema[j].bytes; } - - pDataBlock = (char*)pDataBlock + memRowTLen(memRow); - pBlock->dataLen += memRowTLen(memRow); + int32_t rowLen = TD_ROW_LEN((STSRow*)pDataBlock); + pDataBlock = (char*)pDataBlock + rowLen; + pBlock->dataLen += rowLen; } } else { for (int32_t i = 0; i < numOfRows; ++i) { char* payload = (blkKeyTuple + i)->payloadAddr; - TDRowLenT rowTLen = memRowTLen(payload); + TDRowLenT rowTLen = TD_ROW_LEN((STSRow*)payload); memcpy(pDataBlock, payload, rowTLen); pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen); pBlock->dataLen += rowTLen; @@ -587,16 +526,10 @@ int32_t allocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t return TSDB_CODE_SUCCESS; } -int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, SParsedDataColInfo *pColInfo) { - ASSERT(nRows >= 0 && pColInfo->numOfCols > 0 && (pColInfo->numOfBound <= pColInfo->numOfCols)); - - uint32_t dataLen = TD_MEM_ROW_DATA_HEAD_SIZE + pColInfo->allNullLen; - uint32_t kvLen = TD_MEM_ROW_KV_HEAD_SIZE + pColInfo->numOfBound * sizeof(SColIdx) + pColInfo->boundNullLen; - if (isUtilizeKVRow(kvLen, dataLen)) { - pBuilder->memRowType = SMEM_ROW_KV; - } else { - pBuilder->memRowType = SMEM_ROW_DATA; - } - +int initRowBuilder(SRowBuilder *pBuilder, int16_t schemaVer, SParsedDataColInfo *pColInfo) { + ASSERT(pColInfo->numOfCols > 0 && (pColInfo->numOfBound <= pColInfo->numOfCols)); + tdSRowInit(pBuilder, schemaVer); + tdSRowSetExtendedInfo(pBuilder, pColInfo->numOfCols, pColInfo->numOfBound, pColInfo->flen, pColInfo->allNullLen, + pColInfo->boundNullLen); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/parser/src/insertParser.c b/source/libs/parser/src/insertParser.c index a3e08a2e7e..e7f3be6049 100644 --- a/source/libs/parser/src/insertParser.c +++ b/source/libs/parser/src/insertParser.c @@ -259,28 +259,30 @@ static int parseTime(char **end, SToken *pToken, int16_t timePrec, int64_t *time } typedef struct SMemParam { - SMemRow row; + SRowBuilder* rb; SSchema* schema; int32_t toffset; + int32_t colIdx; } SMemParam; -static FORCE_INLINE int32_t MemRowAppend(const void *value, int32_t len, void *param) { - SMemParam* pa = (SMemParam*)param; +static FORCE_INLINE int32_t MemRowAppend(const void* value, int32_t len, void* param) { + SMemParam* pa = (SMemParam*)param; + SRowBuilder* rb = pa->rb; if (TSDB_DATA_TYPE_BINARY == pa->schema->type) { - char *rowEnd = memRowEnd(pa->row); + const char* rowEnd = tdRowEnd(rb->pBuf); STR_WITH_SIZE_TO_VARSTR(rowEnd, value, len); - tdAppendMemRowColVal(pa->row, rowEnd, true, pa->schema->colId, pa->schema->type, pa->toffset); + tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, false, pa->toffset, pa->colIdx); } else if (TSDB_DATA_TYPE_NCHAR == pa->schema->type) { // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long' - int32_t output = 0; - char * rowEnd = memRowEnd(pa->row); - if (!taosMbsToUcs4(value, len, (char *)varDataVal(rowEnd), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) { + int32_t output = 0; + const char* rowEnd = tdRowEnd(rb->pBuf); + if (!taosMbsToUcs4(value, len, (char*)varDataVal(rowEnd), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) { return TSDB_CODE_TSC_SQL_SYNTAX_ERROR; } varDataSetLen(rowEnd, output); - tdAppendMemRowColVal(pa->row, rowEnd, false, pa->schema->colId, pa->schema->type, pa->toffset); + tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, false, pa->toffset, pa->colIdx); } else { - tdAppendMemRowColVal(pa->row, value, true, pa->schema->colId, pa->schema->type, pa->toffset); + tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, value, true, pa->toffset, pa->colIdx); } return TSDB_CODE_SUCCESS; } @@ -408,26 +410,27 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken) return TSDB_CODE_SUCCESS; } -static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks, int16_t timePrec, int32_t* len, char* tmpTokenBuf) { +static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks, int16_t timePrec, int32_t* len, char* tmpTokenBuf) { SParsedDataColInfo* spd = &pDataBlocks->boundColumnInfo; - SMemRowBuilder* pBuilder = &pDataBlocks->rowBuilder; - char *row = pDataBlocks->pData + pDataBlocks->size; // skip the SSubmitBlk header - initSMemRow(row, pBuilder->memRowType, pDataBlocks, spd->numOfBound); + SRowBuilder* pBuilder = &pDataBlocks->rowBuilder; + STSRow* row = (STSRow*)(pDataBlocks->pData + pDataBlocks->size); // skip the SSubmitBlk header + + tdSRowResetBuf(pBuilder, row); bool isParseBindParam = false; SSchema* schema = getTableColumnSchema(pDataBlocks->pTableMeta); - SMemParam param = {.row = row}; + SMemParam param = {.rb = pBuilder}; SToken sToken = {0}; // 1. set the parsed value from sql string for (int i = 0; i < spd->numOfBound; ++i) { NEXT_TOKEN(pCxt->pSql, sToken); SSchema *pSchema = &schema[spd->boundedColumns[i] - 1]; param.schema = pSchema; - getMemRowAppendInfo(schema, pBuilder->memRowType, spd, i, ¶m.toffset); + getMemRowAppendInfo(schema, pBuilder->rowType, spd, i, ¶m.toffset, ¶m.colIdx); CHECK_CODE(parseValueToken(&pCxt->pSql, &sToken, pSchema, timePrec, tmpTokenBuf, MemRowAppend, ¶m, &pCxt->msg)); if (PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) { - TSKEY tsKey = memRowKey(row); + TSKEY tsKey = TD_ROW_KEY(row); if (checkTimestamp(pDataBlocks, (const char *)&tsKey) != TSDB_CODE_SUCCESS) { buildSyntaxErrMsg(&pCxt->msg, "client time/server time can not be mixed up", sToken.z); return TSDB_CODE_TSC_INVALID_TIME_STAMP; @@ -437,17 +440,17 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks, if (!isParseBindParam) { // set the null value for the columns that do not assign values - if ((spd->numOfBound < spd->numOfCols) && isDataRow(row)) { - SDataRow dataRow = memRowDataBody(row); + if ((spd->numOfBound < spd->numOfCols) && TD_IS_TP_ROW(row)) { for (int32_t i = 0; i < spd->numOfCols; ++i) { - if (spd->cols[i].valStat == VAL_STAT_NONE) { - tdAppendDataColVal(dataRow, getNullValue(schema[i].type), true, schema[i].type, spd->cols[i].toffset); + if (spd->cols[i].valStat == VAL_STAT_NONE) { // the primary TS key is not VAL_STAT_NONE + tdAppendColValToTpRow(pBuilder, TD_VTYPE_NONE, getNullValue(schema[i].type), true, schema[i].type, i, + spd->cols[i].toffset); } } } } - *len = pBuilder->rowSize; + *len = pBuilder->extendedRowSize; return TSDB_CODE_SUCCESS; } @@ -455,7 +458,7 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks, static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlock, int maxRows, int32_t* numOfRows) { STableComInfo tinfo = getTableInfo(pDataBlock->pTableMeta); int32_t extendedRowSize = getExtendedRowSize(pDataBlock); - CHECK_CODE(initMemRowBuilder(&pDataBlock->rowBuilder, 0, &pDataBlock->boundColumnInfo)); + CHECK_CODE(initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo)); (*numOfRows) = 0; char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \" diff --git a/source/libs/parser/src/parserUtil.c b/source/libs/parser/src/parserUtil.c index 4fa27bbaeb..4fd0de3803 100644 --- a/source/libs/parser/src/parserUtil.c +++ b/source/libs/parser/src/parserUtil.c @@ -518,7 +518,7 @@ static void createInputDataFilterInfo(SQueryStmtInfo* px, int32_t numOfCol1, int // // if (IS_RAW_PAYLOAD(insertParam->payloadType)) { // for (int32_t i = 0; i < numOfRows; ++i) { -// SMemRow memRow = (SMemRow)pDataBlock; +// STSRow* memRow = (STSRow*)pDataBlock; // memRowSetType(memRow, SMEM_ROW_DATA); // SDataRow trow = memRowDataBody(memRow); // dataRowSetLen(trow, (uint16_t)(TD_DATA_ROW_HEAD_SIZE + flen)); @@ -531,13 +531,13 @@ static void createInputDataFilterInfo(SQueryStmtInfo* px, int32_t numOfCol1, int // p += pSchema[j].bytes; // } // -// pDataBlock = (char*)pDataBlock + memRowTLen(memRow); -// pBlock->dataLen += memRowTLen(memRow); +// pDataBlock = (char*)pDataBlock + TD_ROW_LEN(memRow); +// pBlock->dataLen += TD_ROW_LEN(memRow); // } // } else { // for (int32_t i = 0; i < numOfRows; ++i) { // char* payload = (blkKeyTuple + i)->payloadAddr; -// TDRowLenT rowTLen = memRowTLen(payload); +// TDRowLenT rowTLen = TD_ROW_LEN(payload); // memcpy(pDataBlock, payload, rowTLen); // pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen); // pBlock->dataLen += rowTLen; -- GitLab