diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index b63646ce6476db2019b4fc5dee6afb792f0700bb..4d65ed3b70591a281ba465ac1b42999af147ec11 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -117,6 +117,7 @@ typedef struct SParsedDataColInfo { uint16_t flen; // TODO: get from STSchema uint16_t allNullLen; // TODO: get from STSchema uint16_t extendedVarLen; + uint16_t boundColsLen; // len of bound columns int32_t * boundedColumns; // bound column idx according to schema SBoundColumn * cols; SBoundIdxInfo *colIdxInfo; @@ -143,8 +144,7 @@ typedef enum { int tsParseTime(SStrToken *pToken, int64_t *time, char **next, char *error, int16_t timePrec); -int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint32_t nCols, uint32_t nBoundCols, - int32_t allNullLen); +int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, SParsedDataColInfo *pColInfo); void destroyMemRowBuilder(SMemRowBuilder *pBuilder); /** @@ -529,12 +529,12 @@ static FORCE_INLINE int32_t getExtendedRowSize(STableDataBlocks *pBlock) { return pBlock->rowSize + TD_MEM_ROW_DATA_HEAD_SIZE + pBlock->boundColumnInfo.extendedVarLen; } -static FORCE_INLINE bool checkAndConvertMemRow(SMemRow row, int32_t dataLen, int32_t kvLen) { +static FORCE_INLINE bool checkConvertMemRow(SMemRow row, int32_t dataLen, int32_t kvLen) { if (isDataRow(row)) { - if (kvLen < (dataLen * KVRatioConvert)) { + if (isConvertToKvRow(kvLen, dataLen)) { return true; } - } else if (kvLen > dataLen) { + } else if (isConvertToDataRow(kvLen, dataLen)) { return true; } return false; diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 2f401fee2341bede1031d4a664dcf312de2f5b8f..280c1c16eb96a06da5a82f25a77a8e35be670abd 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -41,9 +41,8 @@ enum { static int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t *numOfRows); static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDataColInfo *pColInfo, SSchema *pSchema, char *str, char **end); -int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint32_t nCols, uint32_t nBoundCols, - int32_t allNullLen) { - ASSERT(nRows >= 0 && nCols > 0 && (nBoundCols <= nCols)); +int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, SParsedDataColInfo *pColInfo) { + ASSERT(nRows >= 0 && pColInfo->numOfCols > 0 && (pColInfo->numOfBound <= pColInfo->numOfCols)); if (nRows > 0) { // already init(bind multiple rows by single column) if (pBuilder->compareStat == ROW_COMPARE_NEED && (pBuilder->rowInfo != NULL)) { @@ -52,11 +51,21 @@ int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint3 } // default compareStat is ROW_COMPARE_NO_NEED - if (nBoundCols == 0) { // file input + if (pColInfo->numOfBound == 0) { // file input pBuilder->memRowType = SMEM_ROW_DATA; return TSDB_CODE_SUCCESS; + } else if (pColInfo->extendedVarLen == 0) { + // all columns are NoVarType, we can calculate the memRowType precisely + uint32_t dataLen = pColInfo->allNullLen + TD_MEM_ROW_DATA_HEAD_SIZE; + uint32_t kvLen = TD_MEM_ROW_KV_HEAD_SIZE + pColInfo->numOfBound * sizeof(SColIdx) + pColInfo->boundColsLen; + if (isConvertToKvRow(kvLen, dataLen)) { + pBuilder->memRowType = SMEM_ROW_KV; + } else { + pBuilder->memRowType = SMEM_ROW_DATA; + } + return TSDB_CODE_SUCCESS; } else { - float boundRatio = ((float)nBoundCols / (float)nCols); + float boundRatio = ((float)pColInfo->numOfBound / (float)pColInfo->numOfCols); if (boundRatio < KVRatioKV) { pBuilder->memRowType = SMEM_ROW_KV; @@ -74,7 +83,7 @@ int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint3 } } - pBuilder->kvRowInitLen = TD_MEM_ROW_KV_HEAD_SIZE + nBoundCols * sizeof(SColIdx); + pBuilder->kvRowInitLen = TD_MEM_ROW_KV_HEAD_SIZE + pColInfo->numOfBound * sizeof(SColIdx); if (nRows > 0) { pBuilder->rowInfo = tcalloc(nRows, sizeof(SMemRowInfo)); @@ -83,7 +92,7 @@ int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint3 } for (int i = 0; i < nRows; ++i) { - (pBuilder->rowInfo + i)->dataLen = TD_MEM_ROW_DATA_HEAD_SIZE + allNullLen; + (pBuilder->rowInfo + i)->dataLen = TD_MEM_ROW_DATA_HEAD_SIZE + pColInfo->allNullLen; (pBuilder->rowInfo + i)->kvLen = pBuilder->kvRowInitLen; } } @@ -553,7 +562,7 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i // 2. check and set convert flag bool isNeedConvertRow = false; if (pBuilder->compareStat == ROW_COMPARE_NEED) { - isNeedConvertRow = checkAndConvertMemRow(row, dataLen, kvLen); + isNeedConvertRow = checkConvertMemRow(row, dataLen, kvLen); } // 3. set the null value for the columns that do not assign values @@ -630,9 +639,7 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn int32_t extendedRowSize = getExtendedRowSize(pDataBlock); - if (TSDB_CODE_SUCCESS != - (code = initMemRowBuilder(&pDataBlock->rowBuilder, 0, tinfo.numOfColumns, pDataBlock->boundColumnInfo.numOfBound, - pDataBlock->boundColumnInfo.allNullLen))) { + if (TSDB_CODE_SUCCESS != (code = initMemRowBuilder(&pDataBlock->rowBuilder, 0, &pDataBlock->boundColumnInfo))) { return code; } int32_t convertOffset = 0; @@ -1270,6 +1277,7 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat pColInfo->cols[t].valStat = VAL_STAT_HAS; pColInfo->boundedColumns[pColInfo->numOfBound] = t; ++pColInfo->numOfBound; + pColInfo->boundColsLen += pSchema[t].bytes; findColumnIndex = true; if (isOrdered && (lastColIdx > t)) { isOrdered = false; @@ -1293,6 +1301,7 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat pColInfo->cols[t].valStat = VAL_STAT_HAS; pColInfo->boundedColumns[pColInfo->numOfBound] = t; ++pColInfo->numOfBound; + pColInfo->boundColsLen += pSchema[t].bytes; findColumnIndex = true; if (isOrdered && (lastColIdx > t)) { isOrdered = false; diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index d93c0ce89582295557d90128e6484f6a5fe9ac2e..aaed74d677560d7b33b5a9dc0ae66f003fc0ec6f 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -625,6 +625,8 @@ typedef void *SMemRow; #define isKvRowT(t) (SMEM_ROW_KV == (((uint8_t)(t)) & 0x01)) #define isKvRow(r) (SMEM_ROW_KV == memRowType(r)) #define isNeedConvertRow(r) (((*(uint8_t *)(r)) & 0x80) == SMEM_ROW_CONVERT) +#define isConvertToKvRow(k, d) ((k) < ((d)*KVRatioConvert)) +#define isConvertToDataRow(k, d) ((k) > (d)) #define memRowDataBody(r) POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE) // section after flag #define memRowKvBody(r) \